You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/09/27 05:52:43 UTC

[13/14] kylin git commit: KYLIN-1762 fix query test error

KYLIN-1762 fix query test error

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ab5563a8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ab5563a8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ab5563a8

Branch: refs/heads/master
Commit: ab5563a8ec060fba48ec8f43244bed6f887b0e83
Parents: be18158
Author: shaofengshi <sh...@apache.org>
Authored: Sun Sep 25 21:41:37 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 27 10:17:40 2016 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/Kafka10DataLoader.java  |  2 +-
 .../cube/model/CubeJoinedFlatTableDesc.java     |  2 +-
 .../mr/steps/FactDistinctColumnPartitioner.java |  3 ++
 .../engine/mr/steps/FactDistinctColumnsJob.java |  2 +-
 .../mr/steps/FactDistinctColumnsReducer.java    | 38 +++++++++-----
 .../mr/steps/FactDistinctHiveColumnsMapper.java | 49 ++++++++++++++++--
 .../kafka/DEFAULT.STREAMING_TABLE.json          |  1 +
 .../kylin/provision/BuildCubeWithStream.java    | 52 ++++++++++++--------
 .../kylin/provision/BuildCubeWithStream2.java   |  4 +-
 .../apache/kylin/query/ITKylinQueryTest.java    |  3 ++
 .../org/apache/kylin/query/KylinTestBase.java   |  2 +-
 .../apache/kylin/source/kafka/KafkaMRInput.java |  2 +-
 .../source/kafka/TimedJsonStreamParser.java     |  7 +--
 13 files changed, 117 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
index 2b299cc..8c548be 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
@@ -65,7 +65,7 @@ public class Kafka10DataLoader extends StreamDataLoader {
         props.put("retry.backoff.ms", "1000");
         KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props);
 
-        for (int i = 0; i < messages.size(); ++i) {
+        for (int i = 0; i < messages.size(); i++) {
             ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
             producer.send(keyedMessage);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 6ca89c8..5cd4f1d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -143,7 +143,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
     public int getColumnIndex(TblColRef colRef) {
         Integer index = columnIndexMap.get(colRef);
         if (index == null)
-            throw new IllegalArgumentException("Column " + colRef.toString() + " wasn't found on flat table.");
+            return -1;
 
         return index.intValue();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index a631cf4..6973c4b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -34,6 +34,9 @@ public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> {
         if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) {
             // the last reducer is for merging hll
             return numReduceTasks - 1;
+        } else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) {
+            // the last reducer is for merging hll
+            return numReduceTasks - 2;
         } else {
             int colIndex = BytesUtil.readUnsigned(key.getBytes(), 0, 1);
             return colIndex;

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index a6c4d30..a9cc17f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -101,7 +101,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
                 System.out.println("Found segment " + segment);
             }
             setupMapper(cube.getSegmentById(segmentID));
-            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 1 : columnsNeedDict.size());
+            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size());
 
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 0c13df7..2889ba8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -65,7 +65,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
     private List<ByteArray> colValues;
     private TblColRef col = null;
     private boolean isStatistics = false;
-    private boolean outputTouched = false;
+    private boolean isPartitionCol = false;
     private KylinConfig cubeConfig;
     protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
 
@@ -92,25 +92,25 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
             baseCuboidRowCountInMappers = Lists.newArrayList();
             cuboidHLLMap = Maps.newHashMap();
             samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+        } else if (collectStatistics && (taskId == numberOfTasks - 2)) {
+            // partition col
+            isStatistics = false;
+            isPartitionCol = true;
+            col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            colValues = Lists.newLinkedList();
         } else {
             // col
             isStatistics = false;
+            isPartitionCol = false;
             col = columnList.get(taskId);
-            colValues = Lists.newArrayList();
+            colValues = Lists.newLinkedList();
         }
     }
 
     @Override
     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 
-        if (isStatistics == false) {
-            colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
-            if (colValues.size() == 1000000) { //spill every 1 million
-                logger.info("spill values to disk...");
-                outputDistinctValues(col, colValues, context);
-                colValues.clear();
-            }
-        } else {
+        if (isStatistics == true) {
             // for hll
             long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
             for (Text value : values) {
@@ -130,6 +130,21 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
                     cuboidHLLMap.put(cuboidId, hll);
                 }
             }
+        } else if (isPartitionCol == true) {
+            // for partition col min/max value
+            ByteArray value = new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1));
+            if (colValues.size() > 1) {
+                colValues.set(1, value);
+            } else {
+                colValues.add(value);
+            }
+        } else {
+            colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
+            if (colValues.size() == 1000000) { //spill every 1 million
+                logger.info("spill values to disk...");
+                outputDistinctValues(col, colValues, context);
+                colValues.clear();
+            }
         }
 
     }
@@ -156,7 +171,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
             }
         } finally {
             IOUtils.closeQuietly(out);
-            outputTouched = true;
         }
     }
 
@@ -164,7 +178,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
     protected void cleanup(Context context) throws IOException, InterruptedException {
 
         if (isStatistics == false) {
-            if (!outputTouched || colValues.size() > 0) {
+            if (colValues.size() > 0) {
                 outputDistinctValues(col, colValues, context);
                 colValues.clear();
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 5e278f8..86ef487 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
+import org.apache.kylin.metadata.model.TblColRef;
 
 /**
  */
@@ -52,8 +53,12 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     private ByteArray[] row_hashcodes = null;
     private ByteBuffer keyBuffer;
     private static final Text EMPTY_TEXT = new Text();
+    public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
     public static final byte MARK_FOR_HLL = (byte) 0xFF;
 
+    private int partitionColumnIndex = -1;
+    private boolean needFetchPartitionCol = true;
+
     @Override
     protected void setup(Context context) throws IOException {
         super.setup(context);
@@ -81,6 +86,26 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
             for (int i = 0; i < nRowKey; i++) {
                 row_hashcodes[i] = new ByteArray();
             }
+
+            TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            if (partitionColRef != null) {
+                partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
+            }
+
+            // check whether need fetch the partition col values
+            if (partitionColumnIndex < 0) {
+                // if partition col not on cube, no need
+                needFetchPartitionCol = false;
+            } else {
+                for (int x : dictionaryColumnIndex) {
+                    if (x == partitionColumnIndex) {
+                        // if partition col already build dict, no need
+                        needFetchPartitionCol = false;
+                        break;
+                    }
+                }
+            }
+
         }
     }
 
@@ -108,24 +133,38 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     @Override
     public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
         String[] row = flatTableInputFormat.parseMapperInput(record);
+
+        keyBuffer.clear();
         try {
             for (int i = 0; i < factDictCols.size(); i++) {
                 String fieldValue = row[dictionaryColumnIndex[i]];
                 if (fieldValue == null)
                     continue;
-
-                keyBuffer.clear();
+                int offset = keyBuffer.position();
                 keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough
                 keyBuffer.put(Bytes.toBytes(fieldValue));
-                outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
+                outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset);
                 context.write(outputKey, EMPTY_TEXT);
             }
         } catch (Exception ex) {
             handleErrorRecord(row, ex);
         }
 
-        if (collectStatistics && rowCount < samplingPercentage) {
-            putRowKeyToHLL(row);
+        if (collectStatistics) {
+            if (rowCount < samplingPercentage) {
+                putRowKeyToHLL(row);
+            }
+
+            if (needFetchPartitionCol == true) {
+                String fieldValue = row[partitionColumnIndex];
+                if (fieldValue != null) {
+                    int offset = keyBuffer.position();
+                    keyBuffer.put(MARK_FOR_PARTITION_COL);
+                    keyBuffer.put(Bytes.toBytes(fieldValue));
+                    outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset);
+                    context.write(outputKey, EMPTY_TEXT);
+                }
+            }
         }
 
         if (rowCount++ == 100)

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
index 6a64cce..e3ac2d6 100644
--- a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
@@ -6,6 +6,7 @@
   "timeout": 60000,
   "bufferSize": 65536,
   "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser",
+  "parserProperties": "tsColName=timestamp",
   "last_modified": 0,
   "clusters": [
     {

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index bfe1d0a..dfcedfb 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -69,10 +69,19 @@ public class BuildCubeWithStream {
 
     private KafkaConfig kafkaConfig;
     private MockKafka kafkaServer;
+    protected static boolean fastBuildMode = false;
 
     public void before() throws Exception {
         deployEnv();
 
+        String fastModeStr = System.getProperty("fastBuildMode");
+        if (fastModeStr != null && fastModeStr.equalsIgnoreCase("true")) {
+            fastBuildMode = true;
+            logger.info("Will use fast build mode");
+        } else {
+            logger.info("Will not use fast build mode");
+        }
+
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         jobService = ExecutableManager.getInstance(kylinConfig);
         scheduler = DefaultScheduler.createInstance();
@@ -139,29 +148,32 @@ public class BuildCubeWithStream {
         generateStreamData(date1, date2, numberOfRecrods1);
         ExecutableState result = buildSegment(cubeName, 0, Long.MAX_VALUE);
         Assert.assertTrue(result == ExecutableState.SUCCEED);
-        long date3 = f.parse("2013-04-01").getTime();
-        int numberOfRecords2 = 5000;
-        generateStreamData(date2, date3, numberOfRecords2);
-        result = buildSegment(cubeName, 0, Long.MAX_VALUE);
-        Assert.assertTrue(result == ExecutableState.SUCCEED);
 
-        //empty build
-        result = buildSegment(cubeName, 0, Long.MAX_VALUE);
-        Assert.assertTrue(result == ExecutableState.DISCARDED);
+        if (fastBuildMode == false) {
+            long date3 = f.parse("2013-04-01").getTime();
+            int numberOfRecords2 = 5000;
+            generateStreamData(date2, date3, numberOfRecords2);
+            result = buildSegment(cubeName, 0, Long.MAX_VALUE);
+            Assert.assertTrue(result == ExecutableState.SUCCEED);
 
-        //merge
-        result = mergeSegment(cubeName, 0, 15000);
-        Assert.assertTrue(result == ExecutableState.SUCCEED);
+            //empty build
+            result = buildSegment(cubeName, 0, Long.MAX_VALUE);
+            Assert.assertTrue(result == ExecutableState.DISCARDED);
+
+            //merge
+            result = mergeSegment(cubeName, 0, 15000);
+            Assert.assertTrue(result == ExecutableState.SUCCEED);
 
-        List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments();
-        Assert.assertTrue(segments.size() == 1);
+            List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments();
+            Assert.assertTrue(segments.size() == 1);
 
-        CubeSegment toRefreshSeg = segments.get(0);
-        HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo();
+            CubeSegment toRefreshSeg = segments.get(0);
+            HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo();
 
-        refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap);
-        segments = cubeManager.getCube(cubeName).getSegments();
-        Assert.assertTrue(segments.size() == 1);
+            refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap);
+            segments = cubeManager.getCube(cubeName).getSegments();
+            Assert.assertTrue(segments.size() == 1);
+        }
 
     }
 
@@ -197,8 +209,8 @@ public class BuildCubeWithStream {
 
     protected void deployEnv() throws IOException {
         DeployUtil.overrideJobJarLocations();
-        //DeployUtil.initCliWorkDir();
-        //DeployUtil.deployMetadata();
+//        DeployUtil.initCliWorkDir();
+//        DeployUtil.deployMetadata();
     }
 
     public static void beforeClass() throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
index 7959701..d8c857f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java
@@ -45,7 +45,7 @@ import static java.lang.Thread.sleep;
 public class BuildCubeWithStream2 extends BuildCubeWithStream {
 
     private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream2.class);
-    private static boolean generateData = true;
+    private boolean generateData = true;
 
     @Override
     public void build() throws Exception {
@@ -76,6 +76,7 @@ public class BuildCubeWithStream2 extends BuildCubeWithStream {
 
         List<FutureTask<ExecutableState>> futures = Lists.newArrayList();
         for (int i = 0; i < 5; i++) {
+            Thread.sleep(2 * 60 * 1000); // sleep 2 mintues
             FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() {
                 @Override
                 public ExecutableState call() {
@@ -92,7 +93,6 @@ public class BuildCubeWithStream2 extends BuildCubeWithStream {
 
             executorService.submit(futureTask);
             futures.add(futureTask);
-            Thread.sleep(2 * 60 * 1000); // sleep 2 mintues
         }
 
         generateData = false; // stop generating message to kafka

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 59a3a04..93d47f1 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
+import java.util.TimeZone;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -65,6 +66,8 @@ public class ITKylinQueryTest extends KylinTestBase {
 
         RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]");
         RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]");
+
+        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
     }
 
     @AfterClass

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index d0bcf52..57c4f4d 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -586,7 +586,7 @@ public class KylinTestBase {
         //setup cube conn
         File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config);
         Properties props = new Properties();
-        props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "15001");
+        props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "20001");
         cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props);
 
         //setup h2

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index a5f678f..729719a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -118,7 +118,7 @@ public class KafkaMRInput implements IMRInput {
                 }
             }
             Text text = (Text) mapperInput;
-            ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength()).slice();
+            ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength());
             StreamingMessage streamingMessage = streamingParser.parse(buffer);
             return streamingMessage.getData().toArray(new String[streamingMessage.getData().size()]);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab5563a8/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index d3530f1..148ae25 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -47,7 +47,6 @@ public final class TimedJsonStreamParser extends StreamingParser {
     private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class);
 
     private List<TblColRef> allColumns;
-    private boolean formatTs = false;//not used
     private final ObjectMapper mapper = new ObjectMapper();
     private String tsColName = "timestamp";
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
@@ -61,9 +60,6 @@ public final class TimedJsonStreamParser extends StreamingParser {
                     String[] parts = prop.split("=");
                     if (parts.length == 2) {
                         switch (parts[0]) {
-                        case "formatTs":
-                            this.formatTs = Boolean.valueOf(parts[1]);
-                            break;
                         case "tsColName":
                             this.tsColName = parts[1];
                             break;
@@ -78,7 +74,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
             }
         }
 
-        logger.info("TimedJsonStreamParser with formatTs {} tsColName {}", formatTs, tsColName);
+        logger.info("TimedJsonStreamParser with tsColName {}", tsColName);
     }
 
     @Override
@@ -105,7 +101,6 @@ public final class TimedJsonStreamParser extends StreamingParser {
                 }
             }
 
-            logger.info("Streaming Message: " + result.toString());
             return new StreamingMessage(result, 0, t, Collections.<String, Object> emptyMap());
         } catch (IOException e) {
             logger.error("error", e);