You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/03 03:47:02 UTC

incubator-kylin git commit: KYLIN-810, LongSerializer not thread-safe, causes incorrect count() result

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 2e6d27c89 -> b75a24006


KYLIN-810, LongSerializer not thread-safe, causes incorrect count() result


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b75a2400
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b75a2400
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b75a2400

Branch: refs/heads/0.8.0
Commit: b75a240062aa4205d13f33e4af2881cd1a424f56
Parents: 2e6d27c
Author: Yang Li <li...@apache.org>
Authored: Wed Jun 3 09:46:42 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Jun 3 09:46:52 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/util/HadoopUtil.java    | 10 +--
 .../kylin/job/inmemcubing/InMemCubeBuilder.java | 68 ++++++++++++++++----
 .../inmemcubing/ConcurrentDiskStoreTest.java    |  2 +-
 .../serializer/BigDecimalSerializer.java        |  7 +-
 .../metadata/serializer/DateTimeSerializer.java | 25 +++++--
 .../metadata/serializer/DoubleSerializer.java   | 27 +++++---
 .../metadata/serializer/HLLCSerializer.java     | 34 +++++++---
 .../metadata/serializer/LongSerializer.java     | 27 +++++---
 .../java/org/apache/kylin/rest/DebugTomcat.java |  3 +-
 .../kylin/storage/cube/CubeCodeSystem.java      | 18 +++++-
 .../storage/gridtable/GTAggregateScanner.java   | 63 +++++++++++++-----
 11 files changed, 210 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 2486d6c..8ee1440 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -62,11 +62,13 @@ public class HadoopUtil {
             throw new IllegalArgumentException("Cannot create FileSystem from URI: " + filePath, e);
         }
     }
-    
+
     public static String fixWindowsPath(String path) {
         // fix windows path
         if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
             path = path.replace("file://", "file:///");
+        }
+        if (path.startsWith("file:///")) {
             path = path.replace('\\', '/');
         }
         return path;
@@ -134,8 +136,6 @@ public class HadoopUtil {
         String znodePath = m.group(3);
         conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodePath);
 
-
-
         return conf;
     }
 
@@ -148,7 +148,7 @@ public class HadoopUtil {
         int cut = table.indexOf('.');
         String database = cut >= 0 ? table.substring(0, cut).trim() : "DEFAULT";
         String tableName = cut >= 0 ? table.substring(cut + 1).trim() : table.trim();
-        
-        return new String[] {database, tableName};
+
+        return new String[] { database, tableName };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 9e98976..a8e6d02 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -22,14 +22,17 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
@@ -93,11 +96,13 @@ public class InMemCubeBuilder implements Runnable {
     private AtomicInteger taskCuboidCompleted;
     private CuboidResult baseResult;
 
-    private TreeMap<Long, CuboidResult> outputPending;
+    private SortedMap<Long, CuboidResult> outputPending;
     private Thread outputThread;
     private Throwable outputThreadException;
     private int outputCuboidExpected;
 
+    private Object[] totalSumForSanityCheck;
+
     public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap, ICuboidWriter gtRecordWriter) {
         if (dictionaryMap == null || dictionaryMap.isEmpty()) {
             throw new IllegalArgumentException("dictionary cannot be empty");
@@ -158,12 +163,12 @@ public class InMemCubeBuilder implements Runnable {
 
     private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
         GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
-        
+
         // Before several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
         // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
         // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
         ConcurrentDiskStore store = new ConcurrentDiskStore(info);
-        
+
         GridTable gridTable = new GridTable(info, store);
         return gridTable;
     }
@@ -289,13 +294,17 @@ public class InMemCubeBuilder implements Runnable {
                 while (taskCuboidCompleted.get() < outputCuboidExpected) {
                     CuboidTask task = null;
                     synchronized (taskPending) {
-                        while (task == null) {
+                        while (task == null && taskHasNoException()) {
                             task = taskPending.pollFirst();
                             if (task == null)
-                                taskPending.wait();
+                                taskPending.wait(60000);
                         }
                     }
 
+                    // if task error occurs
+                    if (task == null)
+                        break;
+
                     CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
                     addChildTasks(newCuboid);
                     taskCuboidCompleted.incrementAndGet();
@@ -308,12 +317,21 @@ public class InMemCubeBuilder implements Runnable {
                     }
                 }
             } catch (Throwable ex) {
-                if (taskCuboidCompleted.get() < outputCuboidExpected)
+                if (taskCuboidCompleted.get() < outputCuboidExpected) {
+                    logger.error("task thread exception", ex);
                     taskThreadExceptions[id] = ex;
+                }
             }
         }
     }
 
+    private boolean taskHasNoException() {
+        for (int i = 0; i < taskThreadExceptions.length; i++)
+            if (taskThreadExceptions[i] != null)
+                return false;
+        return true;
+    }
+
     private void addChildTasks(CuboidResult parent) {
         List<Long> children = cuboidScheduler.getSpanningCuboid(parent.cuboidId);
         if (!children.isEmpty()) {
@@ -326,10 +344,10 @@ public class InMemCubeBuilder implements Runnable {
         }
     }
 
-    private TreeMap<Long, CuboidResult> prepareOutputPending() {
+    private SortedMap<Long, CuboidResult> prepareOutputPending() {
         TreeMap<Long, CuboidResult> result = new TreeMap<Long, CuboidResult>();
         prepareOutputPendingRecursive(Cuboid.getBaseCuboidId(cubeDesc), result);
-        return result;
+        return Collections.synchronizedSortedMap(result);
     }
 
     private void prepareOutputPendingRecursive(Long cuboidId, TreeMap<Long, CuboidResult> result) {
@@ -344,20 +362,26 @@ public class InMemCubeBuilder implements Runnable {
             public void run() {
                 try {
                     while (!outputPending.isEmpty()) {
-                        CuboidResult result = outputPending.firstEntry().getValue();
+                        CuboidResult result = outputPending.get(outputPending.firstKey());
                         synchronized (result) {
-                            while (result.table == null) {
+                            while (result.table == null && taskHasNoException()) {
                                 try {
-                                    result.wait();
+                                    result.wait(60000);
                                 } catch (InterruptedException e) {
                                     logger.error("interrupted", e);
                                 }
                             }
                         }
+
+                        // if task error occurs
+                        if (result.table == null)
+                            break;
+
                         outputCuboid(result.cuboidId, result.table);
                         outputPending.remove(result.cuboidId);
                     }
                 } catch (Throwable ex) {
+                    logger.error("output thread exception", ex);
                     outputThreadException = ex;
                 }
             }
@@ -498,7 +522,7 @@ public class InMemCubeBuilder implements Runnable {
         logger.info("Calculating cuboid " + cuboidId);
 
         GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
-        IGTScanner scanner = gridTable.scan(req);
+        GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
         GridTable newGridTable = newGridTableByCuboidID(cuboidId);
         GTBuilder builder = newGridTable.rebuild();
 
@@ -546,6 +570,8 @@ public class InMemCubeBuilder implements Runnable {
 
                 builder.write(newRecord);
             }
+
+            sanityCheck(scanner.getTotalSumForSanityCheck());
         } finally {
             scanner.close();
             builder.close();
@@ -557,6 +583,24 @@ public class InMemCubeBuilder implements Runnable {
         return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0);
     }
 
+    private void sanityCheck(Object[] totalSum) {
+        // double sum introduces error and causes result not exactly equal
+        for (int i = 0; i < totalSum.length; i++) {
+            if (totalSum[i] instanceof DoubleWritable) {
+                totalSum[i] = Math.round(((DoubleWritable) totalSum[i]).get());
+            }
+        }
+        logger.info(Arrays.toString(totalSum));
+
+        if (totalSumForSanityCheck == null) {
+            totalSumForSanityCheck = totalSum;
+            return;
+        }
+        if (Arrays.equals(totalSumForSanityCheck, totalSum) == false) {
+            throw new IllegalStateException();
+        }
+    }
+
     private void outputCuboid(long cuboidId, GridTable gridTable) throws IOException {
         long startTime = System.currentTimeMillis();
         GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
index 7aa4640..2fbcd94 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
@@ -47,7 +47,7 @@ public class ConcurrentDiskStoreTest {
     @Test
     public void testMultiThreadRead() throws IOException, InterruptedException {
         long start = System.currentTimeMillis();
-        verifyOneTableWriteAndRead(5);
+        verifyOneTableWriteAndRead(20);
         long end = System.currentTimeMillis();
         System.out.println("Cost " + (end - start) + " millis");
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
index af67112..72e3696 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
@@ -38,7 +38,8 @@ public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
     
     final DataType type;
     final int maxLength;
-    int counter = 0;
+    
+    int avoidVerbose = 0;
     
     public BigDecimalSerializer(DataType type) {
         this.type = type;
@@ -49,8 +50,8 @@ public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
     @Override
     public void serialize(BigDecimal value, ByteBuffer out) {
         if (value.scale() > type.getScale()) {
-            if (counter % 10000 == 0) {
-                logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (counter++));
+            if (avoidVerbose % 10000 == 0) {
+                logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
             }
             value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
index f817693..23f7695 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
@@ -9,8 +9,8 @@ import org.apache.kylin.common.util.DateFormat;
 
 public class DateTimeSerializer extends DataTypeSerializer<LongWritable> {
     
-    // avoid mass object creation
-    LongWritable current = new LongWritable();
+    // be thread-safe and avoid repeated obj creation
+    private static ThreadLocal<LongWritable> current = new ThreadLocal<LongWritable>();
 
     public DateTimeSerializer(DataType type) {
     }
@@ -20,10 +20,20 @@ public class DateTimeSerializer extends DataTypeSerializer<LongWritable> {
         out.putLong(value.get());
     }
 
+    private LongWritable current() {
+        LongWritable l = current.get();
+        if (l == null) {
+            l = new LongWritable();
+            current.set(l);
+        }
+        return l;
+    }
+    
     @Override
     public LongWritable deserialize(ByteBuffer in) {
-        current.set(in.getLong());
-        return current;
+        LongWritable l = current();
+        l.set(in.getLong());
+        return l;
     }
 
     @Override
@@ -38,11 +48,12 @@ public class DateTimeSerializer extends DataTypeSerializer<LongWritable> {
 
     @Override
     public LongWritable valueOf(byte[] value) {
+        LongWritable l = current();
         if (value == null)
-            current.set(0L);
+            l.set(0L);
         else
-            current.set(DateFormat.stringToMillis(Bytes.toString(value)));
-        return current;
+            l.set(DateFormat.stringToMillis(Bytes.toString(value)));
+        return l;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
index 0ebeb78..adafcc0 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
@@ -20,8 +20,8 @@ package org.apache.kylin.metadata.serializer;
 
 import java.nio.ByteBuffer;
 
-import org.apache.kylin.common.util.Bytes;
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.metadata.model.DataType;
 
 /**
@@ -30,8 +30,8 @@ import org.apache.kylin.metadata.model.DataType;
  */
 public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> {
 
-    // avoid mass object creation
-    DoubleWritable current = new DoubleWritable();
+    // be thread-safe and avoid repeated obj creation
+    private static ThreadLocal<DoubleWritable> current = new ThreadLocal<DoubleWritable>();
 
     public DoubleSerializer(DataType type) {
     }
@@ -41,10 +41,20 @@ public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> {
         out.putDouble(value.get());
     }
 
+    private DoubleWritable current() {
+        DoubleWritable d = current.get();
+        if (d == null) {
+            d = new DoubleWritable();
+            current.set(d);
+        }
+        return d;
+    }
+    
     @Override
     public DoubleWritable deserialize(ByteBuffer in) {
-        current.set(in.getDouble());
-        return current;
+        DoubleWritable d = current();
+        d.set(in.getDouble());
+        return d;
     }
 
     @Override
@@ -59,11 +69,12 @@ public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> {
     
     @Override
     public DoubleWritable valueOf(byte[] value) {
+        DoubleWritable d = current();
         if (value == null)
-            current.set(0d);
+            d.set(0d);
         else
-            current.set(Double.parseDouble(Bytes.toString(value)));
-        return current;
+            d.set(Double.parseDouble(Bytes.toString(value)));
+        return d;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
index 3f39866..1b9d225 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
@@ -30,10 +30,13 @@ import org.apache.kylin.metadata.model.DataType;
  */
 public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
 
-    HyperLogLogPlusCounter current;
+    // be thread-safe and avoid repeated obj creation
+    private static ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>();
+    
+    private int precision;
 
     public HLLCSerializer(DataType type) {
-        current = new HyperLogLogPlusCounter(type.getPrecision());
+        this.precision = type.getPrecision();
     }
 
     @Override
@@ -45,34 +48,45 @@ public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
         }
     }
 
+    private HyperLogLogPlusCounter current() {
+        HyperLogLogPlusCounter hllc = current.get();
+        if (hllc == null) {
+            hllc = new HyperLogLogPlusCounter(precision);
+            current.set(hllc);
+        }
+        return hllc;
+    }
+    
     @Override
     public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
+        HyperLogLogPlusCounter hllc = current();
         try {
-            current.readRegisters(in);
+            hllc.readRegisters(in);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        return current;
+        return hllc;
     }
 
     @Override
     public int peekLength(ByteBuffer in) {
-        return current.peekLength(in);
+        return current().peekLength(in);
     }
     
     @Override
     public int maxLength() {
-        return current.maxLength();
+        return current().maxLength();
     }
 
     @Override
     public HyperLogLogPlusCounter valueOf(byte[] value) {
-        current.clear();
+        HyperLogLogPlusCounter hllc = current();
+        hllc.clear();
         if (value == null)
-            current.add("__nUlL__");
+            hllc.add("__nUlL__");
         else
-            current.add(value);
-        return current;
+            hllc.add(value);
+        return hllc;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
index ba9ff8b..28b009f 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
@@ -20,8 +20,8 @@ package org.apache.kylin.metadata.serializer;
 
 import java.nio.ByteBuffer;
 
-import org.apache.kylin.common.util.Bytes;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.metadata.model.DataType;
 
@@ -31,8 +31,8 @@ import org.apache.kylin.metadata.model.DataType;
  */
 public class LongSerializer extends DataTypeSerializer<LongWritable> {
 
-    // avoid mass object creation
-    LongWritable current = new LongWritable();
+    // be thread-safe and avoid repeated obj creation
+    private static ThreadLocal<LongWritable> current = new ThreadLocal<LongWritable>();
 
     public LongSerializer(DataType type) {
     }
@@ -42,10 +42,20 @@ public class LongSerializer extends DataTypeSerializer<LongWritable> {
         BytesUtil.writeVLong(value.get(), out);
     }
 
+    private LongWritable current() {
+        LongWritable l = current.get();
+        if (l == null) {
+            l = new LongWritable();
+            current.set(l);
+        }
+        return l;
+    }
+    
     @Override
     public LongWritable deserialize(ByteBuffer in) {
-        current.set(BytesUtil.readVLong(in));
-        return current;
+        LongWritable l = current();
+        l.set(BytesUtil.readVLong(in));
+        return l;
     }
 
     @Override
@@ -66,11 +76,12 @@ public class LongSerializer extends DataTypeSerializer<LongWritable> {
 
     @Override
     public LongWritable valueOf(byte[] value) {
+        LongWritable l = current();
         if (value == null)
-            current.set(0L);
+            l.set(0L);
         else
-            current.set(Long.parseLong(Bytes.toString(value)));
-        return current;
+            l.set(Long.parseLong(Bytes.toString(value)));
+        return l;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
index 0b5db67..bbdde7a 100644
--- a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
+++ b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
@@ -38,8 +38,9 @@ public class DebugTomcat {
             // test_case_data/sandbox/ contains HDP 2.2 site xmls which is dev sandbox
             ClasspathUtil.addClasspath(new File("../examples/test_case_data/sandbox").getAbsolutePath());
             System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
-            System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
             System.setProperty("spring.profiles.active", "testing");
+            if (System.getProperty("hdp.version") == null)
+                System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
 
             // workaround for job submission from win to linux -- https://issues.apache.org/jira/browse/MAPREDUCE-4052
             if (Shell.WINDOWS) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
index 881e7a1..68b46b5 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
@@ -6,8 +6,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
-import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.dict.Dictionary;
@@ -196,16 +196,27 @@ public class CubeCodeSystem implements IGTCodeSystem {
 
     static class FixLenSerializer extends DataTypeSerializer {
 
+        // be thread-safe and avoid repeated obj creation
+        private static ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
+
         private int fixLen;
-        private byte[] buf;
 
         FixLenSerializer(int fixLen) {
             this.fixLen = fixLen;
-            this.buf = new byte[fixLen];
+        }
+        
+        private byte[] currentBuf() {
+            byte[] buf = current.get();
+            if (buf == null) {
+                buf = new byte[fixLen];
+                current.set(buf);
+            }
+            return buf;
         }
 
         @Override
         public void serialize(Object value, ByteBuffer out) {
+            byte[] buf = currentBuf();
             if (value == null) {
                 Arrays.fill(buf, Dictionary.NULL);
                 out.put(buf);
@@ -223,6 +234,7 @@ public class CubeCodeSystem implements IGTCodeSystem {
 
         @Override
         public Object deserialize(ByteBuffer in) {
+            byte[] buf = currentBuf();
             in.get(buf);
 
             int tail = fixLen;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b75a2400/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index c34adc7..6005d2d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -9,6 +9,8 @@ import java.util.SortedMap;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.measure.HLLCAggregator;
+import org.apache.kylin.metadata.measure.LDCAggregator;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,8 +29,7 @@ public class GTAggregateScanner implements IGTScanner {
     final ImmutableBitSet metrics;
     final String[] metricsAggrFuncs;
     final IGTScanner inputScanner;
-    
-    long estimateSizeOfAggrCache;
+    final AggregationCache aggrCache;
 
     public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
         if (req.hasAggregation() == false)
@@ -40,6 +41,7 @@ public class GTAggregateScanner implements IGTScanner {
         this.metrics = req.getAggrMetrics();
         this.metricsAggrFuncs = req.getAggrMetricsFuncs();
         this.inputScanner = inputScanner;
+        this.aggrCache = new AggregationCache();
     }
 
     @Override
@@ -64,19 +66,19 @@ public class GTAggregateScanner implements IGTScanner {
 
     @Override
     public Iterator<GTRecord> iterator() {
-        AggregationCache aggrCache = new AggregationCache();
         for (GTRecord r : inputScanner) {
             aggrCache.aggregate(r);
         }
-        
-        estimateSizeOfAggrCache = aggrCache.esitmateMemSize();
-        
         return aggrCache.iterator();
     }
-    
-    /** for last call to iterator(), return the estimate memory size of its aggregation cache */
+
+    /** return the estimate memory size of aggregation cache */
     public long getEstimateSizeOfAggrCache() {
-        return estimateSizeOfAggrCache;
+        return aggrCache.esitmateMemSize();
+    }
+
+    public Object[] getTotalSumForSanityCheck() {
+        return aggrCache.calculateTotalSumSanityCheck();
     }
 
     class AggregationCache {
@@ -149,11 +151,7 @@ public class GTAggregateScanner implements IGTScanner {
             final byte[] key = createKey(r);
             MeasureAggregator[] aggrs = aggBufMap.get(key);
             if (aggrs == null) {
-                aggrs = new MeasureAggregator[metricsAggrFuncs.length];
-                for (int i = 0; i < aggrs.length; i++) {
-                    int col = metrics.trueBitAt(i);
-                    aggrs[i] = info.codeSystem.newMetricsAggregator(metricsAggrFuncs[i], col);
-                }
+                aggrs = newAggregators();
                 aggBufMap.put(key, aggrs);
             }
             for (int i = 0; i < aggrs.length; i++) {
@@ -162,11 +160,44 @@ public class GTAggregateScanner implements IGTScanner {
                 aggrs[i].aggregate(metrics);
             }
         }
-        
+
+        private MeasureAggregator[] newAggregators() {
+            MeasureAggregator[] aggrs;
+            aggrs = new MeasureAggregator[metricsAggrFuncs.length];
+            for (int i = 0; i < aggrs.length; i++) {
+                int col = metrics.trueBitAt(i);
+                aggrs[i] = info.codeSystem.newMetricsAggregator(metricsAggrFuncs[i], col);
+            }
+            return aggrs;
+        }
+
+        public Object[] calculateTotalSumSanityCheck() {
+            MeasureAggregator[] totalSum = newAggregators();
+
+            // skip expensive aggregation
+            for (int i = 0; i < totalSum.length; i++) {
+                if (totalSum[i] instanceof HLLCAggregator || totalSum[i] instanceof LDCAggregator)
+                    totalSum[i] = null;
+            }
+
+            for (MeasureAggregator[] entry : aggBufMap.values()) {
+                for (int i = 0; i < totalSum.length; i++) {
+                    if (totalSum[i] != null)
+                        totalSum[i].aggregate(entry[i].getState());
+                }
+            }
+            Object[] result = new Object[totalSum.length];
+            for (int i = 0; i < totalSum.length; i++) {
+                if (totalSum[i] != null)
+                    result[i] = totalSum[i].getState();
+            }
+            return result;
+        }
+
         public long esitmateMemSize() {
             if (aggBufMap.isEmpty())
                 return 0;
-            
+
             byte[] sampleKey = aggBufMap.firstKey();
             MeasureAggregator<?>[] sampleValue = aggBufMap.get(sampleKey);
             return estimateSizeOfAggrCache(sampleKey, sampleValue, aggBufMap.size());