You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/27 12:57:52 UTC

incubator-kylin git commit: GTAggregateScanner bug fix to pass CI

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 a499adff8 -> db6cbe288


GTAggregateScanner bug fix to pass CI


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/db6cbe28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/db6cbe28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/db6cbe28

Branch: refs/heads/0.8.0
Commit: db6cbe2887d09c21d40eb35554cf5e57bb064ebd
Parents: a499adf
Author: Li, Yang <ya...@ebay.com>
Authored: Wed May 27 18:56:40 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed May 27 18:57:25 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/util/HadoopUtil.java    |  11 +-
 .../kylin/dict/lookup/FileTableReader.java      |   1 +
 .../storage/gridtable/GTAggregateScanner.java   | 164 +++++--------------
 .../storage/gridtable/DictGridTableTest.java    |   2 +-
 4 files changed, 55 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db6cbe28/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 84bab94..2486d6c 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -57,11 +57,20 @@ public class HadoopUtil {
 
     public static URI makeURI(String filePath) {
         try {
-            return new URI(filePath);
+            return new URI(fixWindowsPath(filePath));
         } catch (URISyntaxException e) {
             throw new IllegalArgumentException("Cannot create FileSystem from URI: " + filePath, e);
         }
     }
+    
+    public static String fixWindowsPath(String path) {
+        // fix windows path
+        if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
+            path = path.replace("file://", "file:///");
+            path = path.replace('\\', '/');
+        }
+        return path;
+    }
 
     public static Configuration newHadoopJobConfiguration() {
         Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db6cbe28/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
index e5a80c0..3631b25 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
@@ -65,6 +65,7 @@ public class FileTableReader implements TableReader {
     }
     
     public FileTableReader(String filePath, String delim, int expectedColumnNumber) throws IOException {
+        filePath = HadoopUtil.fixWindowsPath(filePath);
         this.filePath = filePath;
         this.delim = delim;
         this.expectedColumnNumber = expectedColumnNumber;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db6cbe28/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index fff2b77..ef41e05 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -2,6 +2,7 @@ package org.apache.kylin.storage.gridtable;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.slf4j.Logger;
@@ -15,9 +16,12 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class GTAggregateScanner implements IGTScanner {
 
+    @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
+
     final GTInfo info;
     final BitSet dimensions; // dimensions to return, can be more than group by
     final BitSet groupBy;
@@ -25,11 +29,10 @@ public class GTAggregateScanner implements IGTScanner {
     final String[] metricsAggrFuncs;
     final IGTScanner inputScanner;
 
-
     public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
         if (req.hasAggregation() == false)
             throw new IllegalStateException();
-        
+
         this.info = inputScanner.getInfo();
         this.dimensions = (BitSet) req.getColumns().clone();
         this.dimensions.andNot(req.getAggrMetrics());
@@ -43,7 +46,7 @@ public class GTAggregateScanner implements IGTScanner {
     public GTInfo getInfo() {
         return info;
     }
-    
+
     @Override
     public int getScannedRowCount() {
         return inputScanner.getScannedRowCount();
@@ -71,20 +74,25 @@ public class GTAggregateScanner implements IGTScanner {
 
     class AggregationCacheWithBytesKey {
         final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+        final int keyLength;
+        final boolean[] compareMask;
 
         public AggregationCacheWithBytesKey() {
+            compareMask = createCompareMask();
+            keyLength = compareMask.length;
             aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
                 @Override
                 public int compare(byte[] o1, byte[] o2) {
                     int result = 0;
-                    Preconditions.checkArgument(o1.length == o2.length);
-                    final int length = o1.length;
-                    for (int i = 0; i < length; ++i) {
-                        result = o1[i] - o2[i];
-                        if (result == 0) {
-                            continue;
-                        } else {
-                            return result;
+                    Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
+                    for (int i = 0; i < keyLength; ++i) {
+                        if (compareMask[i]) {
+                            result = o1[i] - o2[i];
+                            if (result == 0) {
+                                continue;
+                            } else {
+                                return result;
+                            }
                         }
                     }
                     return result;
@@ -92,10 +100,29 @@ public class GTAggregateScanner implements IGTScanner {
             });
         }
 
+        private boolean[] createCompareMask() {
+            int keyLength = 0;
+            for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
+                int l = info.codeSystem.maxCodeLength(i);
+                keyLength += l;
+            }
+
+            boolean[] mask = new boolean[keyLength];
+            int p = 0;
+            for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
+                int l = info.codeSystem.maxCodeLength(i);
+                boolean m = groupBy.get(i) ? true : false;
+                for (int j = 0; j < l; j++) {
+                    mask[p++] = m;
+                }
+            }
+            return mask;
+        }
+
         private byte[] createKey(GTRecord record) {
-            byte[] result = new byte[info.getMaxColumnLength(groupBy)];
+            byte[] result = new byte[keyLength];
             int offset = 0;
-            for (int i = groupBy.nextSetBit(0); i >= 0; i = groupBy.nextSetBit(i + 1)) {
+            for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
                 final ByteArray byteArray = record.cols[i];
                 final int columnLength = info.codeSystem.maxCodeLength(i);
                 System.arraycopy(byteArray.array(), byteArray.offset(), result, offset, byteArray.length());
@@ -129,13 +156,7 @@ public class GTAggregateScanner implements IGTScanner {
                 final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
 
                 final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
-                final GTRecord secondRecord;
-
-                {
-                    BitSet dimensionsAndMetrics = (BitSet) groupBy.clone();
-                    dimensionsAndMetrics.or(metrics);
-                    secondRecord = new GTRecord(info, dimensionsAndMetrics);
-                }
+                final GTRecord secondRecord = new GTRecord(info);
 
                 @Override
                 public boolean hasNext() {
@@ -151,7 +172,7 @@ public class GTAggregateScanner implements IGTScanner {
 
                 private void create(byte[] key, MeasureAggregator[] value) {
                     int offset = 0;
-                    for (int i = groupBy.nextSetBit(0); i >= 0; i = groupBy.nextSetBit(i + 1)) {
+                    for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
                         final int columnLength = info.codeSystem.maxCodeLength(i);
                         secondRecord.set(i, new ByteArray(key, offset, columnLength));
                         offset += columnLength;
@@ -173,103 +194,4 @@ public class GTAggregateScanner implements IGTScanner {
         }
     }
 
-    /*
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    class AggregationCache {
-        final SortedMap<GTRecord, MeasureAggregator[]> aggBufMap;
-
-        public AggregationCache() {
-            this.aggBufMap = Maps.newTreeMap();
-        }
-
-        void aggregate(GTRecord r) {
-            r.maskForEqualHashComp(groupBy);
-            MeasureAggregator[] aggrs = aggBufMap.get(r);
-            if (aggrs == null) {
-                aggrs = new MeasureAggregator[metricsAggrFuncs.length];
-                for (int i = 0, col = -1; i < aggrs.length; i++) {
-                    col = metrics.nextSetBit(col + 1);
-                    aggrs[i] = info.codeSystem.newMetricsAggregator(metricsAggrFuncs[i], col);
-                }
-                aggBufMap.put(r.copy(dimensions), aggrs);
-            }
-
-            for (int i = 0, col = -1; i < aggrs.length; i++) {
-                col = metrics.nextSetBit(col + 1);
-                Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].asBuffer());
-                aggrs[i].aggregate(metrics);
-            }
-        }
-
-        public Iterator<GTRecord> iterator() {
-            return new Iterator<GTRecord>() {
-                
-                final Iterator<Entry<GTRecord, MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
-
-                final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
-                final GTRecord oneRecord;  // avoid instance creation
-
-                {
-                    BitSet dimensionsAndMetrics = (BitSet) groupBy.clone();
-                    dimensionsAndMetrics.or(metrics);
-                    oneRecord = new GTRecord(info, dimensionsAndMetrics);
-                }
-
-                @Override
-                public boolean hasNext() {
-                    return it.hasNext();
-                }
-
-                @Override
-                public GTRecord next() {
-                    Entry<GTRecord, MeasureAggregator[]> entry = it.next();
-
-                    GTRecord dims = entry.getKey();
-                    for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
-                        oneRecord.cols[i].set(dims.cols[i]);
-                    }
-                    
-                    metricsBuf.clear();
-                    MeasureAggregator[] aggrs = entry.getValue();
-                    for (int i = 0, col = -1; i < aggrs.length; i++) {
-                        col = metrics.nextSetBit(col + 1);
-                        int pos = metricsBuf.position();
-                        info.codeSystem.encodeColumnValue(col, aggrs[i].getState(), metricsBuf);
-                        oneRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
-                    }
-                    return oneRecord;
-                }
-
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
-        }
-
-        transient int rowMemBytes;
-        static final int MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
-
-        public void checkMemoryUsage() {
-            // about memory calculation,
-            // http://seniorjava.wordpress.com/2013/09/01/java-objects-memory-size-reference/
-            if (rowMemBytes <= 0) {
-                if (aggBufMap.size() > 0) {
-                    rowMemBytes = 0;
-                    MeasureAggregator[] measureAggregators = aggBufMap.get(aggBufMap.firstKey());
-                    for (MeasureAggregator agg : measureAggregators) {
-                        rowMemBytes += agg.getMemBytes();
-                    }
-                }
-            }
-            int size = aggBufMap.size();
-            int memUsage = (40 + rowMemBytes) * size;
-            if (memUsage > MEMORY_USAGE_CAP) {
-                throw new RuntimeException("Kylin coprocess memory usage goes beyond cap, (40 + " + rowMemBytes + ") * " + size + " > " + MEMORY_USAGE_CAP + ". Abord coprocessor.");
-            }
-        }
-    }
-
-    */
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/db6cbe28/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
index 3607175..af47b89 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
@@ -69,7 +69,7 @@ public class DictGridTableTest {
             List<GTScanRange> r = planner.planScanRanges(filter);
             assertEquals(1, r.size());
             assertEquals("[1421193600000, 10]-[null, null]", r.get(0).toString());
-            assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).hbaseFuzzyKeys.toString());
+            assertEquals("[[10], [20]]", r.get(0).hbaseFuzzyKeys.toString());
         }
         
         // pre-evaluate ever false