You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/11/23 05:53:14 UTC

[12/18] kylin git commit: APACHE-KYLIN-2732: add option for InMemCuboidFromBaseCuboidJob to indicate whether collect base cuboid or not

APACHE-KYLIN-2732: add option for InMemCuboidFromBaseCuboidJob to indicate whether collect base cuboid or not

Signed-off-by: Zhong <nj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e849cc2a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e849cc2a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e849cc2a

Branch: refs/heads/ci-dong
Commit: e849cc2aae8d4aba04cadfd1830427ea1d1ee5bf
Parents: 1b6d8fe
Author: Ma Gang <mg...@163.com>
Authored: Thu Nov 9 15:20:51 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Nov 23 13:31:34 2017 +0800

----------------------------------------------------------------------
 .../cube/inmemcubing/InMemCubeBuilder.java      |  6 +----
 .../cube/inmemcubing/InputConverterUnit.java    | 13 ++++++++--
 .../InputConverterUnitForBaseCuboid.java        | 25 ++++++++++++++++----
 .../InputConverterUnitForRawData.java           | 20 ++++++++++++----
 .../engine/mr/common/AbstractHadoopJob.java     |  3 +++
 .../kylin/engine/mr/common/BatchConstants.java  |  2 ++
 .../mr/steps/InMemCuboidFromBaseCuboidJob.java  |  6 +++++
 .../steps/InMemCuboidFromBaseCuboidMapper.java  | 12 +++++++---
 .../engine/mr/steps/InMemCuboidMapper.java      |  5 ++--
 .../engine/mr/steps/InMemCuboidMapperBase.java  | 14 +++++------
 .../inmemcubing/ITInMemCubeBuilderTest.java     |  2 +-
 11 files changed, 77 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 97bb1de..3a4cf4c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -87,7 +87,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private CuboidResult baseResult;
     private Object[] totalSumForSanityCheck;
     private ICuboidCollector resultCollector;
-    private boolean ifBaseCuboidCollected = true;
 
     public InMemCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc,
             Map<TblColRef, Dictionary<String>> dictionaryMap) {
@@ -138,9 +137,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     public <T> ConcurrentNavigableMap<Long, CuboidResult> build(RecordConsumeBlockingQueueController<T> input)
             throws IOException {
-        if (input.inputConverterUnit instanceof InputConverterUnitForBaseCuboid) {
-            ifBaseCuboidCollected = false;
-        }
         final ConcurrentNavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
         build(input, new ICuboidCollector() {
             @Override
@@ -364,7 +360,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
         logger.info("Wild estimate of base aggr cache is " + mbEstimateBaseAggrCache + " MB");
 
-        return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0, ifBaseCuboidCollected);
+        return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0, input.inputConverterUnit.ifChange());
     }
 
     private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
index fe32937..aa1c9f4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
@@ -21,13 +21,22 @@ package org.apache.kylin.cube.inmemcubing;
 import org.apache.kylin.gridtable.GTRecord;
 
 public interface InputConverterUnit<T> {
+
+    /** Convert currentObject to a GTRecord*/
     public void convert(T currentObject, GTRecord record);
 
+    /** Check if currentObject is for indicating the end of the data stream*/
     public boolean ifEnd(T currentObject);
 
+    /** Check if currentObject is for cutting the data stream*/
     public boolean ifCut(T currentObject);
 
-    public T getEmptyUnit();
+    /** Get the object indicating the end of the data stream*/
+    public T getEndRow();
+
+    /** Get the object for cutting the data stream*/
+    public T getCutRow();
 
-    public T getCutUnit();
+    /** Get whether the input source is different from the final output cuboid*/
+    public boolean ifChange();
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
index 9110a87..be1a38e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
@@ -23,22 +23,32 @@ import org.apache.kylin.gridtable.GTRecord;
 
 public class InputConverterUnitForBaseCuboid implements InputConverterUnit<ByteArray> {
 
-    public static final ByteArray EMPTY_ROW = new ByteArray();
+    public static final ByteArray END_ROW = new ByteArray();
     public static final ByteArray CUT_ROW = new ByteArray(0);
 
+    private final boolean ifChange;
+
+    public InputConverterUnitForBaseCuboid(boolean ifChange) {
+        this.ifChange = ifChange;
+    }
+
+    @Override
     public void convert(ByteArray currentObject, GTRecord record) {
         record.loadColumns(currentObject.asBuffer());
     }
 
+    @Override
     public boolean ifEnd(ByteArray currentObject) {
-        return currentObject == EMPTY_ROW;
+        return currentObject == END_ROW;
     }
 
-    public ByteArray getEmptyUnit() {
-        return EMPTY_ROW;
+    @Override
+    public ByteArray getEndRow() {
+        return END_ROW;
     }
 
-    public ByteArray getCutUnit() {
+    @Override
+    public ByteArray getCutRow() {
         return CUT_ROW;
     }
 
@@ -46,4 +56,9 @@ public class InputConverterUnitForBaseCuboid implements InputConverterUnit<ByteA
     public boolean ifCut(ByteArray currentObject) {
         return currentObject == CUT_ROW;
     }
+
+    @Override
+    public boolean ifChange() {
+        return ifChange;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
index f6548b2..fc34f37 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
@@ -44,7 +44,7 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
     private static final Logger logger = LoggerFactory.getLogger(InputConverterUnitForRawData.class);
     
     public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
-    public static final String[] EMPTY_ROW = new String[0];
+    public static final String[] END_ROW = new String[0];
     public static final String[] CUT_ROW = { "" };
 
     private final CubeJoinedFlatTableEnrich flatDesc;
@@ -64,6 +64,7 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
         initNullBytes(cubeDesc);
     }
 
+    @Override
     public final void convert(String[] row, GTRecord record) {
         Object[] dimensions = buildKey(row);
         Object[] metricsValues = buildValue(row);
@@ -73,19 +74,23 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
         record.setValues(recordValues);
     }
 
+    @Override
     public boolean ifEnd(String[] currentObject) {
-        return currentObject == EMPTY_ROW;
+        return currentObject == END_ROW;
     }
 
+    @Override
     public boolean ifCut(String[] currentObject) {
         return currentObject == CUT_ROW;
     }
 
-    public String[] getEmptyUnit() {
-        return EMPTY_ROW;
+    @Override
+    public String[] getEndRow() {
+        return END_ROW;
     }
 
-    public String[] getCutUnit() {
+    @Override
+    public String[] getCutRow() {
         return CUT_ROW;
     }
 
@@ -103,6 +108,11 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
         return key;
     }
 
+    @Override
+    public boolean ifChange() {
+        return true;
+    }
+
     private Object[] buildValue(String[] row) {
         Object[] values = new Object[measureCount];
         for (int i = 0; i < measureCount; i++) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index f5cee9f..b4fb0a0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -118,6 +118,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             .withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
     protected static final Option OPTION_CUBOID_MODE = OptionBuilder.withArgName(BatchConstants.ARG_CUBOID_MODE)
             .hasArg().isRequired(false).withDescription("Cuboid Mode").create(BatchConstants.ARG_CUBOID_MODE);
+    protected static final Option OPTION_NEED_UPDATE_BASE_CUBOID_SHARD = OptionBuilder
+            .withArgName(BatchConstants.ARG_UPDATE_SHARD).hasArg().isRequired(false)
+            .withDescription("If need to update base cuboid shard").create(BatchConstants.ARG_UPDATE_SHARD);
 
     private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 1d6a582..129c6dd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -29,6 +29,7 @@ public interface BatchConstants {
      * ConFiGuration entry names for MR jobs
      */
 
+    String CFG_UPDATE_SHARD = "update.shard";
     String CFG_CUBOID_MODE = "cuboid.mode";
     String CFG_CUBE_NAME = "cube.name";
     String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
@@ -76,6 +77,7 @@ public interface BatchConstants {
     String ARG_OUTPUT = "output";
     String ARG_PROJECT = "project";
     String ARG_CUBOID_MODE = "cuboidMode";
+    String ARG_UPDATE_SHARD = "updateShard"; // indicate if need update base cuboid shard
     String ARG_JOB_NAME = "jobname";
     String ARG_CUBING_JOB_ID = "cubingJobId";
     String ARG_CUBE_NAME = "cubename";

http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
index 62109f4..7bfa33a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
@@ -76,6 +76,7 @@ public class InMemCuboidFromBaseCuboidJob extends AbstractHadoopJob {
             options.addOption(OPTION_CUBING_JOB_ID);
             options.addOption(OPTION_INPUT_PATH);
             options.addOption(OPTION_CUBOID_MODE);
+            options.addOption(OPTION_NEED_UPDATE_BASE_CUBOID_SHARD);
             parseOptions(options, args);
 
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
@@ -91,6 +92,10 @@ public class InMemCuboidFromBaseCuboidJob extends AbstractHadoopJob {
             if (cuboidModeName == null) {
                 cuboidModeName = CuboidModeEnum.CURRENT.toString();
             }
+            String ifNeedUpdateBaseCuboidShard = getOptionValue(OPTION_NEED_UPDATE_BASE_CUBOID_SHARD);
+            if (ifNeedUpdateBaseCuboidShard == null) {
+                ifNeedUpdateBaseCuboidShard = "false";
+            }
 
             CuboidScheduler cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSeg, cuboidModeName);
 
@@ -111,6 +116,7 @@ public class InMemCuboidFromBaseCuboidJob extends AbstractHadoopJob {
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
             job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidModeName);
+            job.getConfiguration().set(BatchConstants.CFG_UPDATE_SHARD, ifNeedUpdateBaseCuboidShard);
 
             String input = getOptionValue(OPTION_INPUT_PATH);
             FileInputFormat.setInputPaths(job, new Path(input));

http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
index 73a39d6..1beebc7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
@@ -40,6 +40,7 @@ import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
 import org.apache.kylin.cube.inmemcubing.InputConverterUnitForBaseCuboid;
 import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -64,13 +65,18 @@ public class InMemCuboidFromBaseCuboidMapper
     }
 
     @Override
-    protected InputConverterUnit<ByteArray> getInputConverterUnit() {
-        return new InputConverterUnitForBaseCuboid();
+    protected InputConverterUnit<ByteArray> getInputConverterUnit(Context context) {
+        String updateShard = context.getConfiguration().get(BatchConstants.CFG_UPDATE_SHARD);
+        if (updateShard == null || updateShard.equalsIgnoreCase("false")) {
+            return new InputConverterUnitForBaseCuboid(false);
+        } else {
+            return new InputConverterUnitForBaseCuboid(true);
+        }
     }
 
     @Override
     protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
-            int reserveMemoryMB, CuboidScheduler cuboidScheduler, InputConverterUnit<ByteArray> inputConverterUnit) {
+            int reserveMemoryMB, CuboidScheduler cuboidScheduler) {
         AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
         cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
         cubeBuilder.setConcurrentThreads(taskThreadCount);

http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 859e126..551a17b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -41,7 +41,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class InMemCuboidMapper<KEYIN>
         extends InMemCuboidMapperBase<KEYIN, Object, ByteArrayWritable, ByteArrayWritable, String[]> {
 
-
     private IMRInput.IMRTableInputFormat flatTableInputFormat;
 
     @Override
@@ -52,7 +51,7 @@ public class InMemCuboidMapper<KEYIN>
     }
 
     @Override
-    protected InputConverterUnit<String[]> getInputConverterUnit() {
+    protected InputConverterUnit<String[]> getInputConverterUnit(Context context) {
         Preconditions.checkNotNull(cubeDesc);
         Preconditions.checkNotNull(dictionaryMap);
         return new InputConverterUnitForRawData(cubeDesc, flatDesc, dictionaryMap);
@@ -65,7 +64,7 @@ public class InMemCuboidMapper<KEYIN>
 
     @Override
     protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
-            int reserveMemoryMB, CuboidScheduler cuboidScheduler, InputConverterUnit<String[]> inputConverterUnit) {
+            int reserveMemoryMB, CuboidScheduler cuboidScheduler) {
         AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
         cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
         cubeBuilder.setConcurrentThreads(taskThreadCount);

http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
index 7b4738b..73af138 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
@@ -73,10 +73,10 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
     protected InputConverterUnit<T> inputConverterUnit;
     private Future<?> future;
 
-    protected abstract InputConverterUnit<T> getInputConverterUnit();
+    protected abstract InputConverterUnit<T> getInputConverterUnit(Context context);
 
-    protected abstract Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap, int reserveMemoryMB, //
-                                                    CuboidScheduler cuboidScheduler, InputConverterUnit<T> inputConverterUnit);
+    protected abstract Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
+            int reserveMemoryMB, CuboidScheduler cuboidScheduler);
 
     protected abstract T getRecordFromKeyValue(KEYIN key, VALUEIN value);
 
@@ -116,8 +116,8 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
 
         taskThreadCount = config.getCubeAlgorithmInMemConcurrentThreads();
         reserveMemoryMB = calculateReserveMB(conf);
-        inputConverterUnit = getInputConverterUnit();
-        future = getCubingThreadFuture(context, dictionaryMap, reserveMemoryMB, cuboidScheduler, inputConverterUnit);
+        inputConverterUnit = getInputConverterUnit(context);
+        future = getCubingThreadFuture(context, dictionaryMap, reserveMemoryMB, cuboidScheduler);
     }
 
     private int calculateReserveMB(Configuration configuration) {
@@ -145,7 +145,7 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
         }
 
         if (counter % unitRows == 0 && shouldCutSplit(nSplit, countOfLastSplit)) {
-            if (offer(context, inputConverterUnit.getCutUnit(), 1, TimeUnit.MINUTES, 60)) {
+            if (offer(context, inputConverterUnit.getCutRow(), 1, TimeUnit.MINUTES, 60)) {
                 countOfLastSplit = 0;
             } else {
                 throw new IOException("Failed to offer row to internal queue due to queue full!");
@@ -159,7 +159,7 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
         logger.info("Totally handled " + mapCounter + " records!");
 
         while (!future.isDone()) {
-            if (queue.offer(inputConverterUnit.getEmptyUnit(), 1, TimeUnit.SECONDS)) {
+            if (queue.offer(inputConverterUnit.getEndRow(), 1, TimeUnit.SECONDS)) {
                 break;
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index ad754cd..3f97f80 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -186,7 +186,7 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
             }
             queue.put(row);
         }
-        queue.put(InputConverterUnitForRawData.EMPTY_ROW);
+        queue.put(InputConverterUnitForRawData.END_ROW);
     }
 
     static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {