You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/11/23 05:53:14 UTC
[12/18] kylin git commit: APACHE-KYLIN-2732: add option for
InMemCuboidFromBaseCuboidJob to indicate whether collect base cuboid or not
APACHE-KYLIN-2732: add option for InMemCuboidFromBaseCuboidJob to indicate whether collect base cuboid or not
Signed-off-by: Zhong <nj...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e849cc2a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e849cc2a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e849cc2a
Branch: refs/heads/ci-dong
Commit: e849cc2aae8d4aba04cadfd1830427ea1d1ee5bf
Parents: 1b6d8fe
Author: Ma Gang <mg...@163.com>
Authored: Thu Nov 9 15:20:51 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Nov 23 13:31:34 2017 +0800
----------------------------------------------------------------------
.../cube/inmemcubing/InMemCubeBuilder.java | 6 +----
.../cube/inmemcubing/InputConverterUnit.java | 13 ++++++++--
.../InputConverterUnitForBaseCuboid.java | 25 ++++++++++++++++----
.../InputConverterUnitForRawData.java | 20 ++++++++++++----
.../engine/mr/common/AbstractHadoopJob.java | 3 +++
.../kylin/engine/mr/common/BatchConstants.java | 2 ++
.../mr/steps/InMemCuboidFromBaseCuboidJob.java | 6 +++++
.../steps/InMemCuboidFromBaseCuboidMapper.java | 12 +++++++---
.../engine/mr/steps/InMemCuboidMapper.java | 5 ++--
.../engine/mr/steps/InMemCuboidMapperBase.java | 14 +++++------
.../inmemcubing/ITInMemCubeBuilderTest.java | 2 +-
11 files changed, 77 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 97bb1de..3a4cf4c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -87,7 +87,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private CuboidResult baseResult;
private Object[] totalSumForSanityCheck;
private ICuboidCollector resultCollector;
- private boolean ifBaseCuboidCollected = true;
public InMemCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc,
Map<TblColRef, Dictionary<String>> dictionaryMap) {
@@ -138,9 +137,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
public <T> ConcurrentNavigableMap<Long, CuboidResult> build(RecordConsumeBlockingQueueController<T> input)
throws IOException {
- if (input.inputConverterUnit instanceof InputConverterUnitForBaseCuboid) {
- ifBaseCuboidCollected = false;
- }
final ConcurrentNavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
build(input, new ICuboidCollector() {
@Override
@@ -364,7 +360,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
logger.info("Wild estimate of base aggr cache is " + mbEstimateBaseAggrCache + " MB");
- return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0, ifBaseCuboidCollected);
+ return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0, input.inputConverterUnit.ifChange());
}
private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
index fe32937..aa1c9f4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnit.java
@@ -21,13 +21,22 @@ package org.apache.kylin.cube.inmemcubing;
import org.apache.kylin.gridtable.GTRecord;
public interface InputConverterUnit<T> {
+
+ /** Convert currentObject to a GTRecord*/
public void convert(T currentObject, GTRecord record);
+ /** Check if currentObject is for indicating the end of the data stream*/
public boolean ifEnd(T currentObject);
+ /** Check if currentObject is for cutting the data stream*/
public boolean ifCut(T currentObject);
- public T getEmptyUnit();
+ /** Get the object indicating the end of the data stream*/
+ public T getEndRow();
+
+ /** Get the object for cutting the data stream*/
+ public T getCutRow();
- public T getCutUnit();
+ /** Get whether the input source is different from the final output cuboid*/
+ public boolean ifChange();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
index 9110a87..be1a38e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForBaseCuboid.java
@@ -23,22 +23,32 @@ import org.apache.kylin.gridtable.GTRecord;
public class InputConverterUnitForBaseCuboid implements InputConverterUnit<ByteArray> {
- public static final ByteArray EMPTY_ROW = new ByteArray();
+ public static final ByteArray END_ROW = new ByteArray();
public static final ByteArray CUT_ROW = new ByteArray(0);
+ private final boolean ifChange;
+
+ public InputConverterUnitForBaseCuboid(boolean ifChange) {
+ this.ifChange = ifChange;
+ }
+
+ @Override
public void convert(ByteArray currentObject, GTRecord record) {
record.loadColumns(currentObject.asBuffer());
}
+ @Override
public boolean ifEnd(ByteArray currentObject) {
- return currentObject == EMPTY_ROW;
+ return currentObject == END_ROW;
}
- public ByteArray getEmptyUnit() {
- return EMPTY_ROW;
+ @Override
+ public ByteArray getEndRow() {
+ return END_ROW;
}
- public ByteArray getCutUnit() {
+ @Override
+ public ByteArray getCutRow() {
return CUT_ROW;
}
@@ -46,4 +56,9 @@ public class InputConverterUnitForBaseCuboid implements InputConverterUnit<ByteA
public boolean ifCut(ByteArray currentObject) {
return currentObject == CUT_ROW;
}
+
+ @Override
+ public boolean ifChange() {
+ return ifChange;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
index f6548b2..fc34f37 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
@@ -44,7 +44,7 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
private static final Logger logger = LoggerFactory.getLogger(InputConverterUnitForRawData.class);
public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
- public static final String[] EMPTY_ROW = new String[0];
+ public static final String[] END_ROW = new String[0];
public static final String[] CUT_ROW = { "" };
private final CubeJoinedFlatTableEnrich flatDesc;
@@ -64,6 +64,7 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
initNullBytes(cubeDesc);
}
+ @Override
public final void convert(String[] row, GTRecord record) {
Object[] dimensions = buildKey(row);
Object[] metricsValues = buildValue(row);
@@ -73,19 +74,23 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
record.setValues(recordValues);
}
+ @Override
public boolean ifEnd(String[] currentObject) {
- return currentObject == EMPTY_ROW;
+ return currentObject == END_ROW;
}
+ @Override
public boolean ifCut(String[] currentObject) {
return currentObject == CUT_ROW;
}
- public String[] getEmptyUnit() {
- return EMPTY_ROW;
+ @Override
+ public String[] getEndRow() {
+ return END_ROW;
}
- public String[] getCutUnit() {
+ @Override
+ public String[] getCutRow() {
return CUT_ROW;
}
@@ -103,6 +108,11 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
return key;
}
+ @Override
+ public boolean ifChange() {
+ return true;
+ }
+
private Object[] buildValue(String[] row) {
Object[] values = new Object[measureCount];
for (int i = 0; i < measureCount; i++) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index f5cee9f..b4fb0a0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -118,6 +118,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
.withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
protected static final Option OPTION_CUBOID_MODE = OptionBuilder.withArgName(BatchConstants.ARG_CUBOID_MODE)
.hasArg().isRequired(false).withDescription("Cuboid Mode").create(BatchConstants.ARG_CUBOID_MODE);
+ protected static final Option OPTION_NEED_UPDATE_BASE_CUBOID_SHARD = OptionBuilder
+ .withArgName(BatchConstants.ARG_UPDATE_SHARD).hasArg().isRequired(false)
+ .withDescription("If need to update base cuboid shard").create(BatchConstants.ARG_UPDATE_SHARD);
private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 1d6a582..129c6dd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -29,6 +29,7 @@ public interface BatchConstants {
* ConFiGuration entry names for MR jobs
*/
+ String CFG_UPDATE_SHARD = "update.shard";
String CFG_CUBOID_MODE = "cuboid.mode";
String CFG_CUBE_NAME = "cube.name";
String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
@@ -76,6 +77,7 @@ public interface BatchConstants {
String ARG_OUTPUT = "output";
String ARG_PROJECT = "project";
String ARG_CUBOID_MODE = "cuboidMode";
+ String ARG_UPDATE_SHARD = "updateShard"; // indicate if need update base cuboid shard
String ARG_JOB_NAME = "jobname";
String ARG_CUBING_JOB_ID = "cubingJobId";
String ARG_CUBE_NAME = "cubename";
http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
index 62109f4..7bfa33a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
@@ -76,6 +76,7 @@ public class InMemCuboidFromBaseCuboidJob extends AbstractHadoopJob {
options.addOption(OPTION_CUBING_JOB_ID);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_CUBOID_MODE);
+ options.addOption(OPTION_NEED_UPDATE_BASE_CUBOID_SHARD);
parseOptions(options, args);
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
@@ -91,6 +92,10 @@ public class InMemCuboidFromBaseCuboidJob extends AbstractHadoopJob {
if (cuboidModeName == null) {
cuboidModeName = CuboidModeEnum.CURRENT.toString();
}
+ String ifNeedUpdateBaseCuboidShard = getOptionValue(OPTION_NEED_UPDATE_BASE_CUBOID_SHARD);
+ if (ifNeedUpdateBaseCuboidShard == null) {
+ ifNeedUpdateBaseCuboidShard = "false";
+ }
CuboidScheduler cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSeg, cuboidModeName);
@@ -111,6 +116,7 @@ public class InMemCuboidFromBaseCuboidJob extends AbstractHadoopJob {
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidModeName);
+ job.getConfiguration().set(BatchConstants.CFG_UPDATE_SHARD, ifNeedUpdateBaseCuboidShard);
String input = getOptionValue(OPTION_INPUT_PATH);
FileInputFormat.setInputPaths(job, new Path(input));
http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
index 73a39d6..1beebc7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
@@ -40,6 +40,7 @@ import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
import org.apache.kylin.cube.inmemcubing.InputConverterUnitForBaseCuboid;
import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.metadata.model.TblColRef;
@@ -64,13 +65,18 @@ public class InMemCuboidFromBaseCuboidMapper
}
@Override
- protected InputConverterUnit<ByteArray> getInputConverterUnit() {
- return new InputConverterUnitForBaseCuboid();
+ protected InputConverterUnit<ByteArray> getInputConverterUnit(Context context) {
+ String updateShard = context.getConfiguration().get(BatchConstants.CFG_UPDATE_SHARD);
+ if (updateShard == null || updateShard.equalsIgnoreCase("false")) {
+ return new InputConverterUnitForBaseCuboid(false);
+ } else {
+ return new InputConverterUnitForBaseCuboid(true);
+ }
}
@Override
protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
- int reserveMemoryMB, CuboidScheduler cuboidScheduler, InputConverterUnit<ByteArray> inputConverterUnit) {
+ int reserveMemoryMB, CuboidScheduler cuboidScheduler) {
AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
cubeBuilder.setConcurrentThreads(taskThreadCount);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 859e126..551a17b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -41,7 +41,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class InMemCuboidMapper<KEYIN>
extends InMemCuboidMapperBase<KEYIN, Object, ByteArrayWritable, ByteArrayWritable, String[]> {
-
private IMRInput.IMRTableInputFormat flatTableInputFormat;
@Override
@@ -52,7 +51,7 @@ public class InMemCuboidMapper<KEYIN>
}
@Override
- protected InputConverterUnit<String[]> getInputConverterUnit() {
+ protected InputConverterUnit<String[]> getInputConverterUnit(Context context) {
Preconditions.checkNotNull(cubeDesc);
Preconditions.checkNotNull(dictionaryMap);
return new InputConverterUnitForRawData(cubeDesc, flatDesc, dictionaryMap);
@@ -65,7 +64,7 @@ public class InMemCuboidMapper<KEYIN>
@Override
protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
- int reserveMemoryMB, CuboidScheduler cuboidScheduler, InputConverterUnit<String[]> inputConverterUnit) {
+ int reserveMemoryMB, CuboidScheduler cuboidScheduler) {
AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
cubeBuilder.setConcurrentThreads(taskThreadCount);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
index 7b4738b..73af138 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
@@ -73,10 +73,10 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
protected InputConverterUnit<T> inputConverterUnit;
private Future<?> future;
- protected abstract InputConverterUnit<T> getInputConverterUnit();
+ protected abstract InputConverterUnit<T> getInputConverterUnit(Context context);
- protected abstract Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap, int reserveMemoryMB, //
- CuboidScheduler cuboidScheduler, InputConverterUnit<T> inputConverterUnit);
+ protected abstract Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
+ int reserveMemoryMB, CuboidScheduler cuboidScheduler);
protected abstract T getRecordFromKeyValue(KEYIN key, VALUEIN value);
@@ -116,8 +116,8 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
taskThreadCount = config.getCubeAlgorithmInMemConcurrentThreads();
reserveMemoryMB = calculateReserveMB(conf);
- inputConverterUnit = getInputConverterUnit();
- future = getCubingThreadFuture(context, dictionaryMap, reserveMemoryMB, cuboidScheduler, inputConverterUnit);
+ inputConverterUnit = getInputConverterUnit(context);
+ future = getCubingThreadFuture(context, dictionaryMap, reserveMemoryMB, cuboidScheduler);
}
private int calculateReserveMB(Configuration configuration) {
@@ -145,7 +145,7 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
}
if (counter % unitRows == 0 && shouldCutSplit(nSplit, countOfLastSplit)) {
- if (offer(context, inputConverterUnit.getCutUnit(), 1, TimeUnit.MINUTES, 60)) {
+ if (offer(context, inputConverterUnit.getCutRow(), 1, TimeUnit.MINUTES, 60)) {
countOfLastSplit = 0;
} else {
throw new IOException("Failed to offer row to internal queue due to queue full!");
@@ -159,7 +159,7 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
logger.info("Totally handled " + mapCounter + " records!");
while (!future.isDone()) {
- if (queue.offer(inputConverterUnit.getEmptyUnit(), 1, TimeUnit.SECONDS)) {
+ if (queue.offer(inputConverterUnit.getEndRow(), 1, TimeUnit.SECONDS)) {
break;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e849cc2a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index ad754cd..3f97f80 100644
--- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -186,7 +186,7 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase {
}
queue.put(row);
}
- queue.put(InputConverterUnitForRawData.EMPTY_ROW);
+ queue.put(InputConverterUnitForRawData.END_ROW);
}
static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {