You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/30 14:52:08 UTC

incubator-kylin git commit: KYLIN-668, InMemCubeBuilder move to job module

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 3d6a036b1 -> b4b108842


KYLIN-668, InMemCubeBuilder move to job module


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b4b10884
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b4b10884
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b4b10884

Branch: refs/heads/0.8.0
Commit: b4b1088423235cdedc43daecaf051beeb6b4ce98
Parents: 3d6a036
Author: Yang Li <li...@apache.org>
Authored: Sat May 30 20:51:28 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sat May 30 20:51:28 2015 +0800

----------------------------------------------------------------------
 .../common/util/MemoryBudgetController.java     |  10 +-
 .../common/util/MemoryBudgetControllerTest.java |   2 +-
 .../job/hadoop/cubev2/InMemCuboidMapper.java    |  21 +-
 .../hadoop/cubev2/MapContextGTRecordWriter.java |  19 +-
 .../kylin/job/streaming/CubeStreamBuilder.java  |  55 +-
 .../kylin/job/BuildCubeWithStreamTest.java      |  48 +-
 .../job/inmemcubing/InMemCubeBuilderTest.java   |   8 +-
 .../kylin/metadata/measure/HLLCAggregator.java  |   2 +-
 .../storage/gridtable/GTAggregateScanner.java   |  25 +-
 .../gridtable/memstore/GTMemDiskStore.java      | 305 ++++++----
 .../kylin/streaming/cube/IGTRecordWriter.java   |  11 -
 .../kylin/streaming/cube/InMemCubeBuilder.java  | 550 -------------------
 .../cube/InMemCubeBuilderBenchmarkTest.java     | 117 ----
 13 files changed, 292 insertions(+), 881 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
index 03496e7..82e503c 100644
--- a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -8,8 +8,6 @@ import org.slf4j.LoggerFactory;
 
 public class MemoryBudgetController {
 
-    public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
-
     public static interface MemoryConsumer {
         // return number MB released
         int freeUp(int mb);
@@ -28,8 +26,8 @@ public class MemoryBudgetController {
         }
     }
 
+    public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
     public static final int ONE_MB = 1024 * 1024;
-    public static final int SYSTEM_RESERVED = 200;
 
     private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
 
@@ -126,7 +124,7 @@ public class MemoryBudgetController {
     }
 
     private boolean checkSystemAvailMB(int mb) {
-        return getSystemAvailMB() - SYSTEM_RESERVED >= mb;
+        return getSystemAvailMB() >= mb;
     }
 
     public static long getSystemAvailBytes() {
@@ -143,8 +141,4 @@ public class MemoryBudgetController {
         return (int) (getSystemAvailBytes() / ONE_MB);
     }
 
-    public static int getMaxPossibleBudget() {
-        return getSystemAvailMB() - SYSTEM_RESERVED;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
index bc2eadd..a523263 100644
--- a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
@@ -11,7 +11,7 @@ public class MemoryBudgetControllerTest {
 
     @Test
     public void test() {
-        int n = MemoryBudgetController.getMaxPossibleBudget() / 2;
+        int n = MemoryBudgetController.getSystemAvailMB() / 2;
         MemoryBudgetController mbc = new MemoryBudgetController(n);
 
         ArrayList<OneMB> mbList = new ArrayList<OneMB>();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index 4efff16..fdb4823 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -1,6 +1,15 @@
 package org.apache.kylin.job.hadoop.cubev2;
 
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -18,15 +27,11 @@ import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.lookup.HiveTableReader;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.streaming.cube.InMemCubeBuilder;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
+import com.google.common.collect.Maps;
 
 /**
  */
@@ -70,7 +75,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
             }
         }
 
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment));
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment));
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         future = executorService.submit(cubeBuilder);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
index 283bed6..6fcd01a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
@@ -1,6 +1,9 @@
 package org.apache.kylin.job.hadoop.cubev2;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -11,21 +14,13 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.job.inmemcubing.ICuboidWriter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.streaming.cube.IGTRecordWriter;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.List;
 
 /**
  */
-public class MapContextGTRecordWriter implements IGTRecordWriter {
+public class MapContextGTRecordWriter implements ICuboidWriter {
 
     private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
     protected MapContext<?, ?, ImmutableBytesWritable, Text> mapContext;
@@ -52,7 +47,7 @@ public class MapContextGTRecordWriter implements IGTRecordWriter {
     }
 
     @Override
-    public void write(Long cuboidId, GTRecord record) throws IOException {
+    public void write(long cuboidId, GTRecord record) throws IOException {
 
         if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
             // output another cuboid

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 0bd2792..be44e18 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -1,13 +1,21 @@
 package org.apache.kylin.job.streaming;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nullable;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,6 +50,8 @@ import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
 import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
 import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+import org.apache.kylin.job.inmemcubing.ICuboidWriter;
+import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
@@ -49,19 +59,17 @@ import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.streaming.SEOJsonStreamParser;
 import org.apache.kylin.streaming.StreamBuilder;
 import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.cube.IGTRecordWriter;
-import org.apache.kylin.streaming.cube.InMemCubeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 
 /**
  */
@@ -111,7 +119,7 @@ public class CubeStreamBuilder extends StreamBuilder {
         final HTableInterface hTable = createHTable(cubeSegment);
 
         final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
-        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance,
+        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance.getDescriptor(),
                 dictionaryMap, gtRecordWriter);
 
         executorService.submit(inMemCubeBuilder).get();
@@ -144,7 +152,7 @@ public class CubeStreamBuilder extends StreamBuilder {
         }
     }
 
-    private class CubeStreamRecordWriter implements IGTRecordWriter {
+    private class CubeStreamRecordWriter implements ICuboidWriter {
         final List<InMemKeyValueCreator> keyValueCreators;
         final int nColumns;
         final HTableInterface hTable;
@@ -185,7 +193,7 @@ public class CubeStreamBuilder extends StreamBuilder {
         }
 
         @Override
-        public void write(Long cuboidId, GTRecord record) throws IOException {
+        public void write(long cuboidId, GTRecord record) throws IOException {
             final ByteBuffer key = createKey(cuboidId, record);
             final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId));
             final BitSet bitSet = new BitSet();
@@ -380,11 +388,6 @@ public class CubeStreamBuilder extends StreamBuilder {
         return hTable;
     }
 
-    private void loadToHTable(String hTableName) throws IOException {
-        final HTableInterface table = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
-
-    }
-
     @Override
     protected void onStop() {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index 319d7fa..9dca76a 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -34,11 +34,18 @@
 
 package org.apache.kylin.job;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SetMultimap;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import javax.annotation.Nullable;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hive.hcatalog.data.schema.HCatSchema;
@@ -55,25 +62,24 @@ import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.job.inmemcubing.ICuboidWriter;
+import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.streaming.cube.IGTRecordWriter;
-import org.apache.kylin.streaming.cube.InMemCubeBuilder;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
 
 /**
  *
@@ -145,7 +151,7 @@ public class BuildCubeWithStreamTest {
 
         ArrayBlockingQueue queue = new ArrayBlockingQueue<List<String>>(10000);
 
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new ConsoleGTRecordWriter());
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new ConsoleGTRecordWriter());
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         Future<?> future = executorService.submit(cubeBuilder);
 
@@ -211,12 +217,12 @@ public class BuildCubeWithStreamTest {
     }
 
 
-    class ConsoleGTRecordWriter implements IGTRecordWriter {
+    class ConsoleGTRecordWriter implements ICuboidWriter {
 
         boolean verbose = false;
 
         @Override
-        public void write(Long cuboidId, GTRecord record) throws IOException {
+        public void write(long cuboidId, GTRecord record) throws IOException {
             if (verbose)
                 System.out.println(record.toString());
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
index b5924c2..0a0c97c 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -81,11 +81,11 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
         Map<TblColRef, Dictionary<?>> dictionaryMap = getDictionaryMap(cube, flatTable);
         ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
 
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, dictionaryMap, new ConsoleGTRecordWriter());
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new ConsoleGTRecordWriter());
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         Future<?> future = executorService.submit(cubeBuilder);
 
-        feedData(cube, flatTable, queue, 70000);
+        feedData(cube, flatTable, queue, 60000);
 
         try {
             future.get();
@@ -93,8 +93,6 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
             logger.error("stream build failed", e);
             throw new IOException("Failed to build cube ", e);
         }
-
-        logger.info("stream build finished");
     }
 
     private void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException {
@@ -170,7 +168,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
         boolean verbose = false;
 
         @Override
-        public void write(Long cuboidId, GTRecord record) throws IOException {
+        public void write(long cuboidId, GTRecord record) throws IOException {
             if (verbose)
                 System.out.println(record.toString());
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
index b07fd80..a168743 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
@@ -58,7 +58,7 @@ public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> {
                 + 4 // precision
                 + 8 // ref to HLLC
                 + 8 // HLLC obj shell
-                + 32 + (1 >> precision); // HLLC internal
+                + 32 + (1 << precision); // HLLC internal
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index ec92f92..fcff4c0 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -28,6 +28,8 @@ public class GTAggregateScanner implements IGTScanner {
     final BitSet metrics;
     final String[] metricsAggrFuncs;
     final IGTScanner inputScanner;
+    
+    long estimateSizeOfAggrCache;
 
     public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
         if (req.hasAggregation() == false)
@@ -64,11 +66,19 @@ public class GTAggregateScanner implements IGTScanner {
 
     @Override
     public Iterator<GTRecord> iterator() {
-        AggregationCache aggregationCacheWithBytesKey = new AggregationCache();
+        AggregationCache aggrCache = new AggregationCache();
         for (GTRecord r : inputScanner) {
-            aggregationCacheWithBytesKey.aggregate(r);
+            aggrCache.aggregate(r);
         }
-        return aggregationCacheWithBytesKey.iterator();
+        
+        estimateSizeOfAggrCache = aggrCache.esitmateMemSize();
+        
+        return aggrCache.iterator();
+    }
+    
+    /** for last call to iterator(), return the estimate memory size of its aggregation cache */
+    public long getEstimateSizeOfAggrCache() {
+        return estimateSizeOfAggrCache;
     }
 
     class AggregationCache {
@@ -148,6 +158,15 @@ public class GTAggregateScanner implements IGTScanner {
                 aggrs[i].aggregate(metrics);
             }
         }
+        
+        public long esitmateMemSize() {
+            if (aggBufMap.isEmpty())
+                return 0;
+            
+            byte[] sampleKey = aggBufMap.firstKey();
+            MeasureAggregator<?>[] sampleValue = aggBufMap.get(sampleKey);
+            return estimateSizeOfAggrCache(sampleKey, sampleValue, aggBufMap.size());
+        }
 
         public Iterator<GTRecord> iterator() {
             return new Iterator<GTRecord>() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
index 4cc2f85..0b2c0a4 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
@@ -34,12 +34,15 @@ public class GTMemDiskStore implements IGTStore, Closeable {
     private static final boolean debug = false;
 
     private static final int STREAM_BUFFER_SIZE = 8192;
-    private static final int MEM_CHUNK_SIZE_MB = 1;
+    private static final int MEM_CHUNK_SIZE_MB = 2;
 
-    final GTInfo info;
-    final MemPart memPart;
-    final DiskPart diskPart;
-    final boolean delOnClose;
+    private final GTInfo info;
+    private final Object lock; // all public methods that read/write object states are synchronized on this lock
+    private final MemPart memPart;
+    private final DiskPart diskPart;
+    private final boolean delOnClose;
+
+    private Writer ongoingWriter;
 
     public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
         this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
@@ -51,6 +54,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
     private GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
         this.info = info;
+        this.lock = this;
         this.memPart = new MemPart(budgetCtrl);
         this.diskPart = new DiskPart(diskFile);
         this.delOnClose = delOnClose;
@@ -67,25 +71,44 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
     @Override
     public IGTStoreWriter rebuild(int shard) throws IOException {
-        return new Writer(0);
+        return newWriter(0);
     }
 
     @Override
     public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
-        return new Writer(diskPart.tailOffset);
+        return newWriter(length());
+    }
+
+    private Writer newWriter(long startOffset) throws IOException {
+        synchronized (lock) {
+            if (ongoingWriter != null)
+                throw new IllegalStateException();
+
+            ongoingWriter = new Writer(startOffset);
+            return ongoingWriter;
+        }
     }
 
     @Override
     public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
-        return new Reader();
+        synchronized (lock) {
+            return new Reader();
+        }
     }
 
     @Override
     public void close() throws IOException {
+        // synchronized inside the parts close()
         memPart.close();
         diskPart.close();
     }
 
+    public long length() {
+        synchronized (lock) {
+            return Math.max(memPart.tailOffset(), diskPart.tailOffset);
+        }
+    }
+
     @Override
     public String toString() {
         return "MemDiskStore@" + this.hashCode();
@@ -94,7 +117,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
     private class Reader implements IGTStoreScanner {
 
         final DataInputStream din;
-        long diskOffset = 0;
+        long readOffset = 0;
         long memRead = 0;
         long diskRead = 0;
         int nReadCalls = 0;
@@ -105,7 +128,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         Reader() throws IOException {
             diskPart.openRead();
             if (debug)
-                logger.debug(GTMemDiskStore.this + " read start @ " + diskOffset);
+                logger.debug(GTMemDiskStore.this + " read start @ " + readOffset);
 
             InputStream in = new InputStream() {
                 byte[] tmp = new byte[1];
@@ -122,47 +145,51 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
                 @Override
                 public int read(byte[] b, int off, int len) throws IOException {
-                    nReadCalls++;
-                    if (available() <= 0)
-                        return -1;
-
-                    if (memChunk == null && memPart.headOffset() <= diskOffset && diskOffset < memPart.tailOffset()) {
-                        memChunk = memPart.seekMemChunk(diskOffset);
-                    }
+                    synchronized (lock) {
+                        nReadCalls++;
+                        if (available() <= 0)
+                            return -1;
 
-                    int lenToGo = Math.min(available(), len);
+                        if (memChunk == null && memPart.headOffset() <= readOffset && readOffset < memPart.tailOffset()) {
+                            memChunk = memPart.seekMemChunk(readOffset);
+                        }
 
-                    int nRead = 0;
-                    while (lenToGo > 0) {
-                        int n;
-                        if (memChunk != null) {
-                            if (memChunk.headOffset() > diskOffset) {
-                                memChunk = null;
-                                continue;
+                        int lenToGo = Math.min(available(), len);
+
+                        int nRead = 0;
+                        while (lenToGo > 0) {
+                            int n;
+                            if (memChunk != null) {
+                                if (memChunk.headOffset() > readOffset) {
+                                    memChunk = null;
+                                    continue;
+                                }
+                                if (readOffset >= memChunk.tailOffset()) {
+                                    memChunk = memChunk.next;
+                                    continue;
+                                }
+                                int chunkOffset = (int) (readOffset - memChunk.headOffset());
+                                n = Math.min((int) (memChunk.tailOffset() - readOffset), lenToGo);
+                                System.arraycopy(memChunk.data, chunkOffset, b, off, n);
+                                memRead += n;
+                            } else {
+                                n = diskPart.read(readOffset, b, off, lenToGo);
+                                diskRead += n;
                             }
-                            if (diskOffset >= memChunk.tailOffset()) {
-                                memChunk = memChunk.next;
-                                continue;
-                            }
-                            int chunkOffset = (int) (diskOffset - memChunk.headOffset());
-                            n = Math.min((int) (memChunk.tailOffset() - diskOffset), lenToGo);
-                            System.arraycopy(memChunk.data, chunkOffset, b, off, n);
-                            memRead += n;
-                        } else {
-                            n = diskPart.read(diskOffset, b, off, lenToGo);
-                            diskRead += n;
+                            lenToGo -= n;
+                            nRead += n;
+                            off += n;
+                            readOffset += n;
                         }
-                        lenToGo -= n;
-                        nRead += n;
-                        off += n;
-                        diskOffset += n;
+                        return nRead;
                     }
-                    return nRead;
                 }
 
                 @Override
                 public int available() throws IOException {
-                    return (int) (diskPart.tailOffset - diskOffset);
+                    synchronized (lock) {
+                        return (int) (length() - readOffset);
+                    }
                 }
             };
 
@@ -205,10 +232,12 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
         @Override
         public void close() throws IOException {
-            din.close();
-            diskPart.closeRead();
-            if (debug)
-                logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
+            synchronized (lock) {
+                din.close();
+                diskPart.closeRead();
+                if (debug)
+                    logger.debug(GTMemDiskStore.this + " read end @ " + readOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
+            }
         }
 
     }
@@ -216,18 +245,19 @@ public class GTMemDiskStore implements IGTStore, Closeable {
     private class Writer implements IGTStoreWriter {
 
         final DataOutputStream dout;
-        long diskOffset;
+        long writeOffset;
         long memWrite = 0;
         long diskWrite = 0;
         int nWriteCalls;
+        boolean closed = false;
 
         Writer(long startOffset) throws IOException {
-            diskOffset = 0; // TODO does not support append yet
+            writeOffset = 0; // TODO does not support append yet
             memPart.clear();
             diskPart.clear();
             diskPart.openWrite(false);
             if (debug)
-                logger.debug(GTMemDiskStore.this + " write start @ " + diskOffset);
+                logger.debug(GTMemDiskStore.this + " write start @ " + writeOffset);
 
             memPart.activateMemWrite();
 
@@ -243,22 +273,23 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
                 @Override
                 public void write(byte[] bytes, int offset, int length) throws IOException {
+                    // lock inside memPart.write() and diskPartm.write()
                     nWriteCalls++;
                     while (length > 0) {
                         int n;
                         if (memPartActivated) {
-                            n = memPart.write(bytes, offset, length, diskOffset);
+                            n = memPart.write(bytes, offset, length, writeOffset);
                             memWrite += n;
                             if (n == 0) {
                                 memPartActivated = false;
                             }
                         } else {
-                            n = diskPart.write(diskOffset, bytes, offset, length);
+                            n = diskPart.write(writeOffset, bytes, offset, length);
                             diskWrite += n;
                         }
                         offset += n;
                         length -= n;
-                        diskOffset += n;
+                        writeOffset += n;
                     }
                 }
             };
@@ -272,12 +303,23 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
         @Override
         public void close() throws IOException {
-            dout.close();
-            memPart.finishAsyncFlush();
-            diskPart.closeWrite();
-            assert diskOffset == diskPart.tailOffset;
-            if (debug)
-                logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
+            synchronized (lock) {
+                if (!closed) {
+                    dout.close();
+                    memPart.deactivateMemWrite();
+                }
+
+                if (memPart.asyncFlusher == null) {
+                    assert writeOffset == diskPart.tailOffset;
+                    diskPart.closeWrite();
+                    ongoingWriter = null;
+                    if (debug)
+                        logger.debug(GTMemDiskStore.this + " write end @ " + writeOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
+                } else {
+                    // the asyncFlusher will call this close() again later
+                }
+                closed = true;
+            }
         }
     }
 
@@ -308,7 +350,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
         final MemoryBudgetController budgetCtrl;
 
-        // read & write won't go together, but write() / asyncDiskWrite() / freeUp() can happen at the same time
+        // async flush thread checks this flag out of sync block
         volatile boolean writeActivated;
         MemChunk firstChunk;
         MemChunk lastChunk;
@@ -331,7 +373,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
             return lastChunk == null ? 0 : lastChunk.tailOffset();
         }
 
-        synchronized public MemChunk seekMemChunk(long diskOffset) {
+        public MemChunk seekMemChunk(long diskOffset) {
             MemChunk c = firstChunk;
             while (c != null && c.headOffset() <= diskOffset) {
                 if (diskOffset < c.tailOffset())
@@ -344,7 +386,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         public int write(byte[] bytes, int offset, int length, long diskOffset) {
             int needMoreMem = 0;
 
-            synchronized (this) {
+            synchronized (lock) {
                 if (writeActivated == false)
                     return 0;
 
@@ -356,7 +398,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
                     needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
             }
 
-            // call to budgetCtrl.reserve() out of synchronized block, or deadlock may happen between MemoryConsumers
+            // call to budgetCtrl.reserve() must be out of synchronized block, or deadlock may happen between MemoryConsumers
             if (needMoreMem > 0) {
                 try {
                     budgetCtrl.reserve(this, needMoreMem);
@@ -366,8 +408,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
                 }
             }
 
-            synchronized (this) {
-                if (needMoreMem > 0) {
+            synchronized (lock) {
+                if (needMoreMem > 0 && (chunkCount == 0 || lastChunk.isFull())) {
                     MemChunk chunk = new MemChunk();
                     chunk.diskOffset = diskOffset;
                     chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
@@ -409,11 +451,20 @@ public class GTMemDiskStore implements IGTStore, Closeable {
                                 Thread.sleep(10);
                             }
                             flushToDisk();
+
+                            if (debug)
+                                logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
+
+                            synchronized (lock) {
+                                asyncFlusher = null;
+                                asyncFlushChunk = null;
+                                if (ongoingWriter.closed) {
+                                    ongoingWriter.close(); // call writer.close() again to clean up
+                                }
+                            }
                         } catch (Throwable ex) {
                             asyncFlushException = ex;
                         }
-                        if (debug)
-                            logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
                     }
                 };
                 asyncFlusher.start();
@@ -428,7 +479,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
             while (true) {
                 data = null;
-                synchronized (memPart) {
+                synchronized (lock) {
                     asyncFlushDiskOffset += flushedLen; // bytes written in last loop
                     if (debug)
                         logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
@@ -449,44 +500,26 @@ public class GTMemDiskStore implements IGTStore, Closeable {
             }
         }
 
-        public void finishAsyncFlush() throws IOException {
-            deactivateMemWrite();
-            if (asyncFlusher != null) {
-                try {
-                    asyncFlusher.join();
-                } catch (InterruptedException e) {
-                    logger.warn("", e);
-                }
-                asyncFlusher = null;
-                asyncFlushChunk = null;
-
-                if (asyncFlushException != null) {
-                    if (asyncFlushException instanceof IOException)
-                        throw (IOException) asyncFlushException;
-                    else
-                        throw new IOException(asyncFlushException);
-                }
-            }
-        }
-
         @Override
-        synchronized public int freeUp(int mb) {
-            int mbReleased = 0;
-            while (chunkCount > 0 && mbReleased < mb) {
-                if (firstChunk == asyncFlushChunk)
-                    break;
-
-                mbReleased += MEM_CHUNK_SIZE_MB;
-                chunkCount--;
-                if (chunkCount == 0) {
-                    firstChunk = lastChunk = null;
-                } else {
-                    MemChunk next = firstChunk.next;
-                    firstChunk.next = null;
-                    firstChunk = next;
+        public int freeUp(int mb) {
+            synchronized (lock) {
+                int mbReleased = 0;
+                while (chunkCount > 0 && mbReleased < mb) {
+                    if (firstChunk == asyncFlushChunk)
+                        break;
+
+                    mbReleased += MEM_CHUNK_SIZE_MB;
+                    chunkCount--;
+                    if (chunkCount == 0) {
+                        firstChunk = lastChunk = null;
+                    } else {
+                        MemChunk next = firstChunk.next;
+                        firstChunk.next = null;
+                        firstChunk = next;
+                    }
                 }
+                return mbReleased;
             }
-            return mbReleased;
         }
 
         public void activateMemWrite() {
@@ -501,16 +534,38 @@ public class GTMemDiskStore implements IGTStore, Closeable {
                 logger.debug(GTMemDiskStore.this + " mem write de-activated");
         }
 
-        synchronized public void clear() {
-            budgetCtrl.reserve(this, 0);
+        public void clear() {
             chunkCount = 0;
             firstChunk = lastChunk = null;
+            budgetCtrl.reserve(this, 0);
         }
 
         @Override
         public void close() throws IOException {
-            finishAsyncFlush();
-            clear();
+            synchronized (lock) {
+                if (asyncFlushException != null)
+                    throwAsyncException(asyncFlushException);
+            }
+            try {
+                asyncFlusher.join();
+            } catch (NullPointerException npe) {
+                // that's fine, async flusher may not present
+            } catch (InterruptedException e) {
+                logger.warn("async join interrupted", e);
+            }
+            synchronized (lock) {
+                if (asyncFlushException != null)
+                    throwAsyncException(asyncFlushException);
+                
+                clear();
+            }
+        }
+
+        private void throwAsyncException(Throwable ex) throws IOException {
+            if (ex instanceof IOException)
+                throw (IOException) ex;
+            else
+                throw new IOException(ex);
         }
 
         @Override
@@ -524,6 +579,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         final File diskFile;
         FileChannel writeChannel;
         FileChannel readChannel;
+        int readerCount = 0; // allow parallel readers
         long tailOffset;
 
         DiskPart(File diskFile) throws IOException {
@@ -534,8 +590,10 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         }
 
         public void openRead() throws IOException {
-            readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
-            tailOffset = diskFile.length();
+            if (readChannel == null) {
+                readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+            }
+            readerCount++;
         }
 
         public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
@@ -543,9 +601,16 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         }
 
         public void closeRead() throws IOException {
-            if (readChannel != null) {
-                readChannel.close();
-                readChannel = null;
+            closeRead(false);
+        }
+
+        private void closeRead(boolean force) throws IOException {
+            readerCount--;
+            if (readerCount == 0 || force) {
+                if (readChannel != null) {
+                    readChannel.close();
+                    readChannel = null;
+                }
             }
         }
 
@@ -561,9 +626,11 @@ public class GTMemDiskStore implements IGTStore, Closeable {
         }
 
         public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
-            int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
-            tailOffset = Math.max(diskOffset + n, tailOffset);
-            return n;
+            synchronized (lock) {
+                int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+                tailOffset = Math.max(diskOffset + n, tailOffset);
+                return n;
+            }
         }
 
         public void closeWrite() throws IOException {
@@ -580,10 +647,12 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
         @Override
         public void close() throws IOException {
-            closeWrite();
-            closeRead();
-            if (delOnClose) {
-                diskFile.delete();
+            synchronized (lock) {
+                closeWrite();
+                closeRead(true);
+                if (delOnClose) {
+                    diskFile.delete();
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
deleted file mode 100644
index 2d1e97e..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.kylin.streaming.cube;
-
-import org.apache.kylin.storage.gridtable.GTRecord;
-
-import java.io.IOException;
-
-/**
- */
-public interface IGTRecordWriter {
-    void write(Long cuboidId, GTRecord record) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
deleted file mode 100644
index 0b0dc4c..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java
+++ /dev/null
@@ -1,550 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.kylin.streaming.cube;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- */
-public class InMemCubeBuilder implements Runnable {
-
-    private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
-    private static final int DEFAULT_TIMEOUT = 25;
-
-    private BlockingQueue<List<String>> queue;
-    private CubeDesc desc = null;
-    private long baseCuboidId;
-    private CuboidScheduler cuboidScheduler = null;
-    private Map<TblColRef, Dictionary<?>> dictionaryMap = null;
-    private CubeJoinedFlatTableDesc intermediateTableDesc;
-    private MeasureCodec measureCodec;
-    private String[] metricsAggrFuncs = null;
-    private Map<Integer, Integer> dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on;
-    public static final LongWritable ONE = new LongWritable(1l);
-    private int[] hbaseMeasureRefIndex;
-    private MeasureDesc[] measureDescs;
-    private int measureCount;
-
-    protected IGTRecordWriter gtRecordWriter;
-
-
-    /**
-     * @param queue
-     * @param cube
-     * @param dictionaryMap
-     * @param gtRecordWriter
-     */
-    public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeInstance cube, Map<TblColRef, Dictionary<?>> dictionaryMap, IGTRecordWriter gtRecordWriter) {
-        if (dictionaryMap == null || dictionaryMap.isEmpty()) {
-            throw new IllegalArgumentException("dictionary cannot be empty");
-        }
-        this.queue = queue;
-        this.desc = cube.getDescriptor();
-        this.cuboidScheduler = new CuboidScheduler(desc);
-        this.dictionaryMap = dictionaryMap;
-        this.gtRecordWriter = gtRecordWriter;
-        this.baseCuboidId = Cuboid.getBaseCuboidId(desc);
-        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null);
-        this.measureCodec = new MeasureCodec(desc.getMeasures());
-
-        Map<String, Integer> measureIndexMap = Maps.newHashMap();
-        List<String> metricsAggrFuncsList = Lists.newArrayList();
-        measureCount = desc.getMeasures().size();
-
-        List<MeasureDesc> measureDescsList = Lists.newArrayList();
-        hbaseMeasureRefIndex = new int[measureCount];
-        int measureRef = 0;
-        for (HBaseColumnFamilyDesc familyDesc : desc.getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
-                for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
-                    for (int j = 0; j < measureCount; j++) {
-                        if (desc.getMeasures().get(j).equals(measure)) {
-                            measureDescsList.add(measure);
-                            hbaseMeasureRefIndex[measureRef] = j;
-                            break;
-                        }
-                    }
-                    measureRef++;
-                }
-            }
-        }
-
-        for (int i = 0; i < measureCount; i++) {
-            MeasureDesc measureDesc = measureDescsList.get(i);
-            metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
-            measureIndexMap.put(measureDesc.getName(), i);
-        }
-        this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
-
-        this.dependentMeasures = Maps.newHashMap();
-        for (int i = 0; i < measureCount; i++) {
-            String depMsrRef = measureDescsList.get(i).getDependentMeasureRef();
-            if (depMsrRef != null) {
-                int index = measureIndexMap.get(depMsrRef);
-                dependentMeasures.put(i, index);
-            }
-        }
-
-        this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]);
-    }
-
-
-    private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) {
-        GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap);
-        GTComboStore store = new GTComboStore(info, memStore);
-        GridTable gridTable = new GridTable(info, store);
-        return gridTable;
-    }
-
-    private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
-        Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
-        BitSet parentDimensions = columnBitSets.getFirst();
-        BitSet measureColumns = columnBitSets.getSecond();
-        BitSet childDimensions = (BitSet) parentDimensions.clone();
-
-        long mask = Long.highestOneBit(parentCuboidId);
-        long childCuboidId = cuboidId;
-        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboidId);
-        int index = 0;
-        for (int i = 0; i < parentCuboidIdActualLength; i++) {
-            if ((mask & parentCuboidId) > 0) {
-                if ((mask & childCuboidId) == 0) {
-                    // this dim will be aggregated
-                    childDimensions.set(index, false);
-                }
-                index++;
-            }
-            mask = mask >> 1;
-        }
-
-        return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
-
-    }
-
-    private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
-        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
-        IGTScanner scanner = gridTable.scan(req);
-        GridTable newGridTable = newGridTableByCuboidID(cuboidId, true);
-        GTBuilder builder = newGridTable.rebuild();
-
-        BitSet allNeededColumns = new BitSet();
-        allNeededColumns.or(aggregationColumns);
-        allNeededColumns.or(measureColumns);
-
-        GTRecord newRecord = new GTRecord(newGridTable.getInfo());
-        int counter = 0;
-        ByteArray byteArray = new ByteArray(8);
-        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
-        try {
-            BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality());
-            for (Integer i : dependentMeasures.keySet()) {
-                dependentMetrics.set((allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)));
-            }
-
-            Object[] hllObjects = new Object[dependentMeasures.keySet().size()];
-
-            for (GTRecord record : scanner) {
-                counter++;
-                for (int i = allNeededColumns.nextSetBit(0), index = 0; i >= 0; i = allNeededColumns.nextSetBit(i + 1), index++) {
-                    newRecord.set(index, record.get(i));
-                }
-
-                if(dependentMeasures.size() > 0) {
-                    // update measures which have 'dependent_measure_ref'
-                    newRecord.getValues(dependentMetrics, hllObjects);
-
-                    for (Integer i : dependentMeasures.keySet()) {
-                        for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) {
-                            if (c == allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)) {
-                                assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed
-
-                                byteBuffer.clear();
-                                BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer);
-                                byteArray.set(byteBuffer.array(), 0, byteBuffer.position());
-                                newRecord.set(allNeededColumns.cardinality() - measureCount + i, byteArray);
-                            }
-                        }
-
-                    }
-                }
-
-                builder.write(newRecord);
-            }
-        } finally {
-            builder.close();
-        }
-        logger.info("Cuboid " + cuboidId + " has rows: " + counter);
-
-        return newGridTable;
-    }
-
-    private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
-        BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
-        BitSet dimension = new BitSet();
-        dimension.set(0, bitSet.cardinality());
-        BitSet metrics = new BitSet();
-        metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
-        return new Pair<BitSet, BitSet>(dimension, metrics);
-    }
-
-    private Object[] buildKey(List<String> row) {
-        int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
-        Object[] key = new Object[keySize];
-
-        for (int i = 0; i < keySize; i++) {
-            key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
-        }
-
-        return key;
-    }
-
-    private Object[] buildValue(List<String> row) {
-
-        Object[] values = new Object[measureCount];
-        MeasureDesc measureDesc = null;
-
-        for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
-            int i = hbaseMeasureRefIndex[position];
-            measureDesc = measureDescs[i];
-
-            Object value = null;
-            int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
-            FunctionDesc function = desc.getMeasures().get(i).getFunction();
-            if (function.isCount() || function.isHolisticCountDistinct()) {
-                // note for holistic count distinct, this value will be ignored
-                value = ONE;
-            } else if (flatTableIdx == null) {
-                value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
-            } else if (flatTableIdx.length == 1) {
-                value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0])));
-            } else {
-
-                byte[] result = null;
-                for (int x = 0; x < flatTableIdx.length; x++) {
-                    byte[] split = Bytes.toBytes(row.get(flatTableIdx[x]));
-                    if (result == null) {
-                        result = Arrays.copyOf(split, split.length);
-                    } else {
-                        byte[] newResult = new byte[result.length + split.length];
-                        System.arraycopy(result, 0, newResult, 0, result.length);
-                        System.arraycopy(split, 0, newResult, result.length, split.length);
-                        result = newResult;
-                    }
-                }
-                value = measureCodec.getSerializer(i).valueOf(result);
-            }
-            values[position] = value;
-        }
-        return values;
-    }
-
-
-    @Override
-    public void run() {
-        try {
-            logger.info("Create base cuboid " + baseCuboidId);
-            final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false);
-
-            GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
-            final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
-
-            IGTScanner queueScanner = new IGTScanner() {
-
-                @Override
-                public Iterator<GTRecord> iterator() {
-                    return new Iterator<GTRecord>() {
-
-                        List<String> currentObject = null;
-
-                        @Override
-                        public boolean hasNext() {
-                            try {
-                                currentObject = queue.take();
-                            } catch (InterruptedException e) {
-                                throw new RuntimeException(e);
-                            }
-                            return currentObject != null && currentObject.size() > 0;
-                        }
-
-                        @Override
-                        public GTRecord next() {
-                            if (currentObject.size() == 0)
-                                throw new IllegalStateException();
-
-                            buildGTRecord(currentObject, baseGTRecord);
-                            return baseGTRecord;
-                        }
-
-                        @Override
-                        public void remove() {
-                            throw new UnsupportedOperationException();
-                        }
-                    };
-                }
-
-                @Override
-                public void close() throws IOException {
-                }
-
-                @Override
-                public GTInfo getInfo() {
-                    return baseCuboidGT.getInfo();
-                }
-
-                @Override
-                public int getScannedRowCount() {
-                    return 0;
-                }
-
-                @Override
-                public int getScannedRowBlockCount() {
-                    return 0;
-                }
-            };
-
-            Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
-            GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
-            IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req);
-
-            int counter = 0;
-            for (GTRecord r : aggregationScanner) {
-                baseGTBuilder.write(r);
-                counter++;
-            }
-            baseGTBuilder.close();
-            aggregationScanner.close();
-
-            logger.info("Base cuboid has " + counter + " rows;");
-            SimpleGridTableTree tree = new SimpleGridTableTree();
-            tree.data = baseCuboidGT;
-            tree.id = baseCuboidId;
-            tree.parent = null;
-            if (counter > 0) {
-                List<Long> children = cuboidScheduler.getSpanningCuboid(baseCuboidId);
-                Collections.sort(children);
-                for (Long childId : children) {
-                    createNDCuboidGT(tree, baseCuboidId, childId);
-                }
-            }
-            outputGT(baseCuboidId, baseCuboidGT);
-            dropStore(baseCuboidGT);
-
-        } catch (IOException e) {
-            logger.error("Fail to build cube", e);
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    private void buildGTRecord(List<String> row, GTRecord record) {
-
-        Object[] dimensions = buildKey(row);
-        Object[] metricsValues = buildValue(row);
-        Object[] recordValues = new Object[dimensions.length + metricsValues.length];
-        System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
-        System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
-        record.setValues(recordValues);
-    }
-
-    private boolean gc(TreeNode<GridTable> parentNode) {
-        final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
-        logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables));
-        for (TreeNode<GridTable> gridTable : gridTables) {
-            final GTComboStore store = (GTComboStore) gridTable.data.getStore();
-            if (store.memoryUsage() > 0) {
-                logger.info("cuboid id:" + gridTable.id + " flush to disk");
-                long t = System.currentTimeMillis();
-                store.switchToDiskStore();
-                logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
-                waitForGc();
-                return true;
-            }
-        }
-        logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk");
-        return false;
-
-    }
-
-    private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) {
-        final ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final Future<GridTable> task = executorService.submit(new Callable<GridTable>() {
-            @Override
-            public GridTable call() throws Exception {
-                return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
-            }
-        });
-        try {
-            final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
-            return gridTable;
-        } catch (InterruptedException e) {
-            throw new RuntimeException("this should not happen", e);
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof OutOfMemoryError) {
-                logger.warn("Future.get() OutOfMemory, stop the thread");
-            } else {
-                throw new RuntimeException("this should not happen", e);
-            }
-        } catch (TimeoutException e) {
-            logger.warn("Future.get() timeout, stop the thread");
-        }
-        logger.info("shutdown executor service");
-        final List<Runnable> runnables = executorService.shutdownNow();
-        try {
-            executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
-            waitForGc();
-        } catch (InterruptedException e) {
-            throw new RuntimeException("this should not happen", e);
-        }
-        return null;
-
-    }
-
-    private void waitForGc() {
-        System.gc();
-        logger.info("wait 5 seconds for gc");
-        try {
-            Thread.sleep(5000);
-        } catch (InterruptedException e) {
-            throw new RuntimeException("should not happen", e);
-        }
-    }
-
-    private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
-
-        long startTime = System.currentTimeMillis();
-
-//        GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
-//        if (parentStore.memoryUsage() <= 0) {
-//            long t = System.currentTimeMillis();
-//            parentStore.switchToMemStore();
-//            logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
-//        }
-
-        GridTable currentCuboid;
-        while (true) {
-            logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
-            currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId);
-            if (currentCuboid != null) {
-                break;
-            } else {
-                logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc");
-                if (gc(parentNode)) {
-                    continue;
-                } else {
-                    logger.warn("all parent node has been flushed into disk, memory is still insufficient");
-                    throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient");
-                }
-            }
-        }
-        SimpleGridTableTree node = new SimpleGridTableTree();
-        node.parent = parentNode;
-        node.data = currentCuboid;
-        node.id = cuboidId;
-        parentNode.children.add(node);
-
-        logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms");
-
-        List<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
-        if (!children.isEmpty()) {
-            Collections.sort(children); // sort cuboids
-            for (Long childId : children) {
-                createNDCuboidGT(node, cuboidId, childId);
-            }
-        }
-
-
-        //output the grid table
-        outputGT(cuboidId, currentCuboid);
-        dropStore(currentCuboid);
-        parentNode.children.remove(node);
-        if (parentNode.children.size() > 0) {
-            logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store");
-            ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
-        }
-    }
-
-    private void dropStore(GridTable gt) throws IOException {
-        ((GTComboStore) gt.getStore()).drop();
-    }
-
-
-    private void outputGT(Long cuboidId, GridTable gridTable) throws IOException {
-        long startTime = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
-        IGTScanner scanner = gridTable.scan(req);
-        for (GTRecord record : scanner) {
-            this.gtRecordWriter.write(cuboidId, record);
-        }
-        logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
-    }
-
-    private static class TreeNode<T> {
-        T data;
-        long id;
-        TreeNode<T> parent;
-        List<TreeNode<T>> children = Lists.newArrayList();
-
-        List<TreeNode<T>> getAncestorList() {
-            ArrayList<TreeNode<T>> result = Lists.newArrayList();
-            TreeNode<T> parent = this;
-            while (parent != null) {
-                result.add(parent);
-                parent = parent.parent;
-            }
-            return Lists.reverse(result);
-        }
-
-        @Override
-        public String toString() {
-            return id + "";
-        }
-    }
-
-    private static class SimpleGridTableTree extends TreeNode<GridTable> {
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b4b10884/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java b/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
deleted file mode 100644
index b530cdc..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package org.apache.kylin.streaming.cube;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- */
-public class InMemCubeBuilderBenchmarkTest extends LocalFileMetadataTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderBenchmarkTest.class);
-
-    private static final int BENCHMARK_RECORD_LIMIT = 2000000;
-    private static final String CUBE_NAME = "test_kylin_cube_with_slr_1_new_segment";
-
-    @Before
-    public void setUp() throws Exception {
-        this.createTestMetadata();
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeSegment cubeSegment) {
-        final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
-        final CubeDesc desc = cubeSegment.getCubeDesc();
-        for (DimensionDesc dim : desc.getDimensions()) {
-            // dictionary
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (desc.getRowkey().isUseDictionary(col)) {
-                    Dictionary dict = cubeSegment.getDictionary(col);
-                    if (dict == null) {
-                        throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
-                    }
-                    logger.info("Dictionary for " + col + " was put into dictionary map.");
-                    dictionaryMap.put(col, cubeSegment.getDictionary(col));
-                }
-            }
-        }
-        return dictionaryMap;
-    }
-
-    private static class ConsoleGTRecordWriter implements IGTRecordWriter {
-
-        boolean verbose = false;
-
-        @Override
-        public void write(Long cuboidId, GTRecord record) throws IOException {
-            if (verbose)
-                System.out.println(record.toString());
-        }
-    }
-
-    private void loadDataFromLocalFile(LinkedBlockingQueue queue) throws IOException, InterruptedException {
-        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
-        String line;
-        int counter = 0;
-        while ((line = br.readLine()) != null) {
-            queue.put(Arrays.asList(line.split("\t")));
-            counter++;
-            if (counter == BENCHMARK_RECORD_LIMIT) {
-                break;
-            }
-        }
-        queue.put(Collections.emptyList());
-    }
-
-    private void loadDataFromRandom(LinkedBlockingQueue queue) throws IOException, InterruptedException {
-        queue.put(Collections.emptyList());
-    }
-
-
-    @Test
-    public void test() throws Exception {
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-        final CubeInstance cube = cubeManager.getCube(CUBE_NAME);
-        final CubeSegment cubeSegment = cube.getFirstSegment();
-
-        LinkedBlockingQueue queue = new LinkedBlockingQueue<List<String>>();
-
-        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, getDictionaryMap(cubeSegment), new ConsoleGTRecordWriter());
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(cubeBuilder);
-        loadDataFromLocalFile(queue);
-        future.get();
-        logger.info("stream build finished");
-    }
-}