You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/06 06:46:09 UTC

[1/3] kylin git commit: KYLIN-2059 remove the time threshold on storage cleanup

Repository: kylin
Updated Branches:
  refs/heads/master 792d4ee3f -> 95ebc8512


KYLIN-2059 remove the time threshold on storage cleanup

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/95ebc851
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/95ebc851
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/95ebc851

Branch: refs/heads/master
Commit: 95ebc851274953023ba092a249c1ddbbee5cd845
Parents: 334c2e0
Author: shaofengshi <sh...@apache.org>
Authored: Fri Sep 30 09:13:03 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 6 14:44:05 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfigBase.java     |  4 ----
 .../kylin/engine/mr/steps/MetadataCleanupJob.java    |  2 +-
 .../kylin/storage/hbase/util/StorageCleanupJob.java  | 15 ++++-----------
 3 files changed, 5 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/95ebc851/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4d1639b..99c3c5a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -780,10 +780,6 @@ abstract public class KylinConfigBase implements Serializable {
         return Long.parseLong(getOptional("kylin.query.hbase.hconnection.threads.alive.seconds", "60"));
     }
 
-    public long getStorageCleanupTimeThreshold() {
-        return Long.valueOf(this.getOptional("kylin.storage.cleanup.time.threshold", "172800000")); //default two days
-    }
-
     public int getAppendDictEntrySize() {
         return Integer.parseInt(getOptional("kylin.dict.append.entry.size", "10000000"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/95ebc851/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
index 962697e..f2b1d6b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
@@ -54,7 +54,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
 
     private KylinConfig config = null;
 
-    public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000L; // 2 days
+    public static final long TIME_THREADSHOLD = 1 * 3600 * 1000L; // 1 hour
     public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000L; // 30 days
 
     /*

http://git-wip-us.apache.org/repos/asf/kylin/blob/95ebc851/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index dffce36..2c2f11c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -34,7 +34,6 @@ import java.util.regex.Pattern;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,10 +62,10 @@ public class StorageCleanupJob extends AbstractApplication {
 
     @SuppressWarnings("static-access")
     protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
-    protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete any intermediate hive tables").create("force");
+    protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete all kylin intermediate hive tables").create("force");
 
     protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
-    public static final int TIME_THRESHOLD_DELETE_HTABLE = 10; // Unit minute
+    public static final int deleteTimeout = 10; // Unit minute
 
     protected boolean delete = false;
     protected boolean force = false;
@@ -74,7 +73,6 @@ public class StorageCleanupJob extends AbstractApplication {
 
     private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
         CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        long TIME_THREADSHOLD = KylinConfig.getInstanceFromEnv().getStorageCleanupTimeThreshold();
         // get all kylin hbase tables
         HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
         String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
@@ -82,14 +80,9 @@ public class StorageCleanupJob extends AbstractApplication {
         List<String> allTablesNeedToBeDropped = new ArrayList<String>();
         for (HTableDescriptor desc : tableDescriptors) {
             String host = desc.getValue(IRealizationConstants.HTableTag);
-            String creationTime = desc.getValue(IRealizationConstants.HTableCreationTime);
             if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) {
                 //only take care htables that belongs to self, and created more than 2 days
-                if (StringUtils.isEmpty(creationTime) || (System.currentTimeMillis() - Long.valueOf(creationTime) > TIME_THREADSHOLD)) {
                     allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
-                } else {
-                    logger.info("Exclude table " + desc.getTableName().getNameAsString() + " from drop list, as it is newly created");
-                }
             }
         }
 
@@ -111,9 +104,9 @@ public class StorageCleanupJob extends AbstractApplication {
                 FutureTask futureTask = new FutureTask(new DeleteHTableRunnable(hbaseAdmin, htableName));
                 executorService.execute(futureTask);
                 try {
-                    futureTask.get(TIME_THRESHOLD_DELETE_HTABLE, TimeUnit.MINUTES);
+                    futureTask.get(deleteTimeout, TimeUnit.MINUTES);
                 } catch (TimeoutException e) {
-                    logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + TIME_THRESHOLD_DELETE_HTABLE + " minutes!");
+                    logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + deleteTimeout + " minutes!");
                     futureTask.cancel(true);
                 } catch (Exception e) {
                     e.printStackTrace();


[2/3] kylin git commit: KYLIN-2059 Concurrent build issue in CubeManager.calculateToBeSegments()

Posted by sh...@apache.org.
KYLIN-2059 Concurrent build issue in CubeManager.calculateToBeSegments()

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/334c2e09
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/334c2e09
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/334c2e09

Branch: refs/heads/master
Commit: 334c2e09ba443b8fdcb7d4bfe08ab8fbc0ac3fbe
Parents: aa51ce0
Author: shaofengshi <sh...@apache.org>
Authored: Thu Sep 29 22:56:00 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 6 14:44:05 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  4 +
 .../java/org/apache/kylin/cube/CubeManager.java | 77 ++++++++-------
 .../org/apache/kylin/cube/CubeManagerTest.java  | 99 ++++++++++++++++++++
 3 files changed, 145 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/334c2e09/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 838ef97..4d1639b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -809,4 +809,8 @@ abstract public class KylinConfigBase implements Serializable {
     public int getMaxBuildingSegments() {
         return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "2"));
     }
+
+    public void setMaxBuildingSegments(int maxBuildingSegments) {
+        setProperty("kylin.cube.building.segment.max", String.valueOf(maxBuildingSegments));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/334c2e09/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 962568c..d243f4d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -135,7 +135,7 @@ public class CubeManager implements IRealizationProvider {
         logger.info("Initializing CubeManager with config " + config);
         this.config = config;
         this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube");
-        
+
         // touch lower level metadata before registering my listener
         loadAllCubeInstance();
         Broadcaster.getInstance(config).registerListener(new CubeSyncListener(), "cube");
@@ -159,12 +159,12 @@ public class CubeManager implements IRealizationProvider {
         @Override
         public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
             String cubeName = cacheKey;
-            
+
             if (event == Event.DROP)
                 removeCubeLocal(cubeName);
             else
                 reloadCubeLocal(cubeName);
-            
+
             for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.CUBE, cubeName)) {
                 broadcaster.notifyProjectDataUpdate(prj.getName());
             }
@@ -615,7 +615,6 @@ public class CubeManager implements IRealizationProvider {
         return max;
     }
 
-
     private long calculateStartOffsetForAppendSegment(CubeInstance cube) {
         List<CubeSegment> existing = cube.getSegments();
         if (existing.isEmpty()) {
@@ -625,7 +624,6 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
-
     private long calculateStartDateForAppendSegment(CubeInstance cube) {
         List<CubeSegment> existing = cube.getSegments();
         if (existing.isEmpty()) {
@@ -728,7 +726,7 @@ public class CubeManager implements IRealizationProvider {
 
         List<CubeSegment> mergingSegs = Lists.newArrayList();
         if (buildingSegs.size() > 0) {
-            
+
             for (CubeSegment building : buildingSegs) {
                 // exclude those under-merging segs
                 for (CubeSegment ready : readySegs) {
@@ -760,27 +758,22 @@ public class CubeManager implements IRealizationProvider {
         return null;
     }
 
-    public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment... newSegments) throws IOException {
-        List<CubeSegment> tobe = calculateToBeSegments(cube);
+    public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
+        if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier()))
+            throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier");
 
-        for (CubeSegment seg : newSegments) {
-            if (tobe.contains(seg) == false)
-                throw new IllegalStateException("For cube " + cube + ", segment " + seg + " is expected but not in the tobe " + tobe);
+        if (StringUtils.isBlank(newSegment.getLastBuildJobID()))
+            throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID");
 
-            if (StringUtils.isBlank(seg.getStorageLocationIdentifier()))
-                throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing StorageLocationIdentifier");
+        if (isReady(newSegment) == true)
+            throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY");
 
-            if (StringUtils.isBlank(seg.getLastBuildJobID()))
-                throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing LastBuildJobID");
+        List<CubeSegment> tobe = calculateToBeSegments(cube, newSegment);
 
-            seg.setStatus(SegmentStatusEnum.READY);
-        }
+        if (tobe.contains(newSegment) == false)
+            throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe);
 
-        for (CubeSegment seg : tobe) {
-            if (isReady(seg) == false) {
-                logger.warn("For cube " + cube + ", segment " + seg + " isn't READY yet.");
-            }
-        }
+        newSegment.setStatus(SegmentStatusEnum.READY);
 
         List<CubeSegment> toRemoveSegs = Lists.newArrayList();
         for (CubeSegment segment : cube.getSegments()) {
@@ -788,14 +781,14 @@ public class CubeManager implements IRealizationProvider {
                 toRemoveSegs.add(segment);
         }
 
-        logger.info("Promoting cube " + cube + ", new segments " + Arrays.toString(newSegments) + ", to remove segments " + toRemoveSegs);
+        logger.info("Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs);
 
         CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])).setToUpdateSegs(newSegments).setStatus(RealizationStatusEnum.READY);
+        cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])).setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY);
         updateCube(cubeBuilder);
     }
 
-    public void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
+    public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
         List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments);
         List<CubeSegment> newList = Arrays.asList(newSegments);
         if (tobe.containsAll(newList) == false) {
@@ -809,11 +802,12 @@ public class CubeManager implements IRealizationProvider {
      * - Favors new segments over the old
      * - Favors big segments over the small
      */
-    private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment... newSegments) {
+    private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment newSegments) {
 
         List<CubeSegment> tobe = Lists.newArrayList(cube.getSegments());
-        if (newSegments != null)
-            tobe.addAll(Arrays.asList(newSegments));
+        if (newSegments != null && !tobe.contains(newSegments)) {
+            tobe.add(newSegments);
+        }
         if (tobe.size() == 0)
             return tobe;
 
@@ -849,13 +843,17 @@ public class CubeManager implements IRealizationProvider {
                     } else {
                         tobe.remove(j);
                     }
-                } else if (isNew(is)) {
-                    // otherwise, favor the new segment
-                    tobe.remove(j);
+                    continue;
                 } else {
-                    tobe.remove(i);
+                    // otherwise, favor the new segment
+                    if (isNew(is) && is.equals(newSegments)) {
+                        tobe.remove(j);
+                        continue;
+                    } else if (js.equals(newSegments)) {
+                        tobe.remove(i);
+                        continue;
+                    }
                 }
-                continue;
             }
 
             // if i, j in sequence
@@ -865,8 +863,17 @@ public class CubeManager implements IRealizationProvider {
                 continue;
             }
 
-            // seems j not fitting
-            tobe.remove(j);
+            // js can be covered by is
+            if (is.equals(newSegments)) {
+                // seems j not fitting
+                tobe.remove(j);
+                continue;
+            } else {
+                i++;
+                j++;
+                continue;
+            }
+
         }
 
         return tobe;

http://git-wip-us.apache.org/repos/asf/kylin/blob/334c2e09/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index 49bb128..e63fe99 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.cube;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -123,6 +124,104 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
 
     }
 
+
+    @Test
+    public void testConcurrentBuildAndMerge() throws Exception {
+        CubeManager mgr = CubeManager.getInstance(getTestConfig());
+        CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+        getTestConfig().setMaxBuildingSegments(10);
+        // no segment at first
+        assertEquals(0, cube.getSegments().size());
+
+        // append first
+        CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000);
+        seg1.setStatus(SegmentStatusEnum.READY);
+
+        CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000);
+        seg2.setStatus(SegmentStatusEnum.READY);
+
+
+        CubeSegment seg3 = mgr.mergeSegments(cube, 0, 0, 0000, 2000, true);
+        seg3.setStatus(SegmentStatusEnum.NEW);
+
+
+        CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 2000, 3000);
+        seg4.setStatus(SegmentStatusEnum.NEW);
+        seg4.setLastBuildJobID("test");
+        seg4.setStorageLocationIdentifier("test");
+
+        CubeSegment seg5 = mgr.appendSegment(cube, 0, 0, 3000, 4000);
+        seg5.setStatus(SegmentStatusEnum.READY);
+
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+
+        mgr.updateCube(cubeBuilder);
+
+
+        mgr.promoteNewlyBuiltSegments(cube, seg4);
+
+        assertTrue(cube.getSegments().size() == 5);
+
+        assertTrue(cube.getSegmentById(seg1.getUuid()) != null && cube.getSegmentById(seg1.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg2.getUuid()) != null && cube.getSegmentById(seg2.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg3.getUuid()) != null && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.NEW);
+        assertTrue(cube.getSegmentById(seg4.getUuid()) != null && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg5.getUuid()) != null && cube.getSegmentById(seg5.getUuid()).getStatus() == SegmentStatusEnum.READY);
+
+    }
+
+
+    @Test
+    public void testConcurrentMergeAndMerge() throws Exception {
+        CubeManager mgr = CubeManager.getInstance(getTestConfig());
+        getTestConfig().setMaxBuildingSegments(10);
+        CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+
+        // no segment at first
+        assertEquals(0, cube.getSegments().size());
+
+        // append first
+        CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000);
+        seg1.setStatus(SegmentStatusEnum.READY);
+
+        CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000);
+        seg2.setStatus(SegmentStatusEnum.READY);
+
+        CubeSegment seg3 = mgr.appendSegment(cube, 0, 0, 2000, 3000);
+        seg3.setStatus(SegmentStatusEnum.READY);
+
+        CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 3000, 4000);
+        seg4.setStatus(SegmentStatusEnum.READY);
+
+
+
+        CubeSegment merge1 = mgr.mergeSegments(cube, 0, 0, 0, 2000, true);
+        merge1.setStatus(SegmentStatusEnum.NEW);
+        merge1.setLastBuildJobID("test");
+        merge1.setStorageLocationIdentifier("test");
+
+        CubeSegment merge2 = mgr.mergeSegments(cube, 0, 0, 2000, 4000, true);
+        merge2.setStatus(SegmentStatusEnum.NEW);
+        merge2.setLastBuildJobID("test");
+        merge2.setStorageLocationIdentifier("test");
+
+        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+        mgr.updateCube(cubeBuilder);
+
+
+        mgr.promoteNewlyBuiltSegments(cube, merge1);
+
+        assertTrue(cube.getSegments().size() == 4);
+
+        assertTrue(cube.getSegmentById(seg1.getUuid()) == null);
+        assertTrue(cube.getSegmentById(seg2.getUuid()) == null);
+        assertTrue(cube.getSegmentById(merge1.getUuid()) != null && cube.getSegmentById(merge1.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg3.getUuid()) != null && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(seg4.getUuid()) != null && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY);
+        assertTrue(cube.getSegmentById(merge2.getUuid()) != null && cube.getSegmentById(merge2.getUuid()).getStatus() == SegmentStatusEnum.NEW);
+
+    }
+
     @Test
     public void testGetAllCubes() throws Exception {
         final ResourceStore store = ResourceStore.getStore(getTestConfig());


[3/3] kylin git commit: KYLIN-1919 support embedded json format

Posted by sh...@apache.org.
KYLIN-1919 support embedded json format

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aa51ce0c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aa51ce0c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aa51ce0c

Branch: refs/heads/master
Commit: aa51ce0c3382d330ba5418b49eb669c964315f96
Parents: 792d4ee
Author: shaofengshi <sh...@apache.org>
Authored: Thu Sep 29 16:04:46 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 6 14:44:05 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/DeployUtil.java   |  2 +-
 .../kylin/provision/BuildCubeWithEngine.java    |  2 +-
 .../kylin/provision/BuildCubeWithStream.java    | 16 ++--
 .../java/org/apache/kylin/rest/DebugTomcat.java |  3 +-
 .../kylin/source/kafka/AbstractTimeParser.java  |  4 +-
 .../kylin/source/kafka/DateTimeParser.java      | 40 ++-------
 .../kylin/source/kafka/DefaultTimeParser.java   |  4 +-
 .../kylin/source/kafka/StreamingParser.java     | 41 ++++++++-
 .../source/kafka/StringStreamingParser.java     |  3 +-
 .../source/kafka/TimedJsonStreamParser.java     | 95 ++++++++++++--------
 .../kafka/diagnose/KafkaInputAnalyzer.java      |  6 +-
 11 files changed, 133 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 9e9df05..be9b2a9 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -156,7 +156,7 @@ public class DeployUtil {
         for (ColumnDesc columnDesc : tableDesc.getColumns()) {
             tableColumns.add(columnDesc.getRef());
         }
-        TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
+        TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, null);
         StringBuilder sb = new StringBuilder();
         for (String json : data) {
             List<String> rowColumns = timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).getData();

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 31cf0eb..971b293 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -84,7 +84,7 @@ public class BuildCubeWithEngine {
             afterClass();
             logger.info("Going to exit");
             System.exit(0);
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error("error", e);
             System.exit(1);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 58715f1..f8805a6 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -234,6 +234,8 @@ public class BuildCubeWithStream {
             segments = cubeManager.getCube(cubeName).getSegments();
             Assert.assertTrue(segments.size() == 1);
         }
+
+        logger.info("Build is done");
     }
 
 
@@ -309,20 +311,22 @@ public class BuildCubeWithStream {
     }
 
     public static void main(String[] args) throws Exception {
+        BuildCubeWithStream buildCubeWithStream = null;
         try {
             beforeClass();
-
-            BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
+            buildCubeWithStream = new BuildCubeWithStream();
             buildCubeWithStream.before();
             buildCubeWithStream.build();
-            logger.info("Build is done");
-            buildCubeWithStream.after();
-            afterClass();
             logger.info("Going to exit");
             System.exit(0);
-        } catch (Exception e) {
+        } catch (Throwable e) {
             logger.error("error", e);
             System.exit(1);
+        } finally {
+            if (buildCubeWithStream != null) {
+                buildCubeWithStream.after();
+            }
+            afterClass();
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
index 7417a05..0f2c500 100644
--- a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
+++ b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
@@ -48,7 +48,8 @@ public class DebugTomcat {
                 System.setProperty("catalina.home", ".");
 
             if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
-                throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+                System.err.println("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+                System.exit(1);
             }
 
             // workaround for job submission from win to linux -- https://issues.apache.org/jira/browse/MAPREDUCE-4052

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
index 96a4ece..26624ef 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
@@ -18,11 +18,13 @@
 
 package org.apache.kylin.source.kafka;
 
+import java.util.Map;
+
 /**
  */
 public abstract class AbstractTimeParser {
 
-    public AbstractTimeParser(String[] properties) {
+    public AbstractTimeParser(Map<String, String> properties) {
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
index 2bd699d..3382783 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
@@ -18,51 +18,29 @@
 
 package org.apache.kylin.source.kafka;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.kylin.common.util.DateFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.ParseException;
+import java.util.Map;
 
 /**
  */
 public class DateTimeParser extends AbstractTimeParser {
 
     private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class);
-    private String tsPattern = DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS;
+    private String tsPattern  = null;
 
     private FastDateFormat formatter = null;
 
     //call by reflection
-    public DateTimeParser(String[] properties) {
+    public DateTimeParser(Map<String, String> properties) {
         super(properties);
-        for (String prop : properties) {
-            try {
-                String[] parts = prop.split("=");
-                if (parts.length == 2) {
-                    switch (parts[0]) {
-                    case "tsPattern":
-                        this.tsPattern = parts[1];
-                        break;
-                    default:
-                        break;
-                    }
-                }
-            } catch (Exception e) {
-                logger.error("Failed to parse property " + prop);
-                //ignore
-            }
-        }
+        tsPattern = properties.get(StreamingParser.PROPERTY_TS_PATTERN);
 
-        if (!StringUtils.isEmpty(tsPattern)) {
-            try {
-                formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
-            } catch (Throwable e) {
-                throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
-            }
-        } else {
+        try {
+            formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
+        } catch (Throwable e) {
             throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
         }
     }
@@ -77,8 +55,8 @@ public class DateTimeParser extends AbstractTimeParser {
 
         try {
             return formatter.parse(timeStr).getTime();
-        } catch (ParseException e) {
-            throw new IllegalArgumentException("Invalid value : pattern: '" + tsPattern + "', value: '" + timeStr + "'" , e);
+        } catch (Throwable e) {
+            throw new IllegalArgumentException("Invalid value: pattern: '" + tsPattern + "', value: '" + timeStr + "'", e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
index 85f2bfa..4bcd572 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
@@ -20,11 +20,13 @@ package org.apache.kylin.source.kafka;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.Map;
+
 /**
  */
 public class DefaultTimeParser extends AbstractTimeParser {
 
-    public DefaultTimeParser(String[] properties) {
+    public DefaultTimeParser(Map<String, String> properties) {
         super(properties);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 4d840b8..43b2ac5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -20,8 +20,10 @@ package org.apache.kylin.source.kafka;
 
 import java.lang.reflect.Constructor;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.DateFormat;
@@ -30,12 +32,21 @@ import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, String propertiesStr) as params
+ * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, Map properties) as params
  */
 public abstract class StreamingParser {
 
+    private static final Logger logger = LoggerFactory.getLogger(StreamingParser.class);
+    public static final String PROPERTY_TS_COLUMN_NAME = "tsColName";
+    public static final String PROPERTY_TS_PARSER = "tsParser";
+    public static final String PROPERTY_TS_PATTERN = "tsPattern";
+    public static final String EMBEDDED_PROPERTY_SEPARATOR = "separator";
+
+    public static final Map<String, String> defaultProperties = Maps.newHashMap();
     public static final Set derivedTimeColumns = Sets.newHashSet();
     static {
         derivedTimeColumns.add("minute_start");
@@ -45,6 +56,10 @@ public abstract class StreamingParser {
         derivedTimeColumns.add("month_start");
         derivedTimeColumns.add("quarter_start");
         derivedTimeColumns.add("year_start");
+        defaultProperties.put(PROPERTY_TS_COLUMN_NAME, "timestamp");
+        defaultProperties.put(PROPERTY_TS_PARSER, "org.apache.kylin.source.kafka.DefaultTimeParser");
+        defaultProperties.put(PROPERTY_TS_PATTERN, DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+        defaultProperties.put(EMBEDDED_PROPERTY_SEPARATOR, "_");
     }
 
     /**
@@ -57,14 +72,34 @@ public abstract class StreamingParser {
 
     public static StreamingParser getStreamingParser(String parserName, String parserProperties, List<TblColRef> columns) throws ReflectiveOperationException {
         if (!StringUtils.isEmpty(parserName)) {
+            logger.info("Construct StreamingParse {} with properties {}", parserName, parserProperties);
             Class clazz = Class.forName(parserName);
-            Constructor constructor = clazz.getConstructor(List.class, String.class);
-            return (StreamingParser) constructor.newInstance(columns, parserProperties);
+            Map<String, String> properties = parseProperties(parserProperties);
+            Constructor constructor = clazz.getConstructor(List.class, Map.class);
+            return (StreamingParser) constructor.newInstance(columns, properties);
         } else {
             throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + ".");
         }
     }
 
+    public static Map<String, String> parseProperties(String propertiesStr) {
+
+        Map<String, String> result = Maps.newHashMap(defaultProperties);
+        if (!StringUtils.isEmpty(propertiesStr)) {
+            String[] properties = propertiesStr.split(";");
+            for (String prop : properties) {
+                String[] parts = prop.split("=");
+                if (parts.length == 2) {
+                    result.put(parts[0], parts[1]);
+                } else {
+                    logger.warn("Ignored invalid property expression '" + prop + "'");
+                }
+            }
+        }
+
+        return result;
+    }
+
     /**
      * Calculate the derived time column value and put to the result list.
      * @param columnName the column name, should be in lower case

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index cea8e0b..f74df83 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -33,7 +34,7 @@ public final class StringStreamingParser extends StreamingParser {
 
     public static final StringStreamingParser instance = new StringStreamingParser(null, null);
 
-    private StringStreamingParser(List<TblColRef> allColumns, String propertiesStr) {
+    private StringStreamingParser(List<TblColRef> allColumns, Map<String, String> properties) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 2125c05..d4327c5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.source.kafka;
 
@@ -42,7 +42,17 @@ import com.fasterxml.jackson.databind.type.SimpleType;
 import com.google.common.collect.Lists;
 
 /**
- * each json message with a "timestamp" field
+ * An utility class which parses a JSON streaming message to a list of strings (represent a row in table).
+ *
+ * Each message should have a property whose value represents the message's timestamp, default the column name is "timestamp"
+ * but can be customized by StreamingParser#PROPERTY_TS_PARSER.
+ *
+ * By default it will parse the timestamp col value as Unix time. If the format isn't Unix time, need specify the time parser
+ * with property StreamingParser#PROPERTY_TS_PARSER.
+ *
+ * It also support embedded JSON format; Use a separator (customized by StreamingParser#EMBEDDED_PROPERTY_SEPARATOR) to concat
+ * the property names.
+ *
  */
 public final class TimedJsonStreamParser extends StreamingParser {
 
@@ -50,51 +60,34 @@ public final class TimedJsonStreamParser extends StreamingParser {
 
     private List<TblColRef> allColumns;
     private final ObjectMapper mapper;
-    private String tsColName = "timestamp";
-    private String tsParser = "org.apache.kylin.source.kafka.DefaultTimeParser";
+    private String tsColName = null;
+    private String tsParser = null;
+    private String separator = null;
+
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
 
     private AbstractTimeParser streamTimeParser;
 
-    public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) {
+    public TimedJsonStreamParser(List<TblColRef> allColumns, Map<String, String> properties) {
         this.allColumns = allColumns;
-        String[] properties = null;
-        if (!StringUtils.isEmpty(propertiesStr)) {
-            properties = propertiesStr.split(";");
-            for (String prop : properties) {
-                try {
-                    String[] parts = prop.split("=");
-                    if (parts.length == 2) {
-                        switch (parts[0]) {
-                        case "tsColName":
-                            this.tsColName = parts[1];
-                            break;
-                        case "tsParser":
-                            this.tsParser = parts[1];
-                            break;
-                        default:
-                            break;
-                        }
-                    }
-                } catch (Exception e) {
-                    logger.error("Failed to parse property " + prop);
-                    //ignore
-                }
-            }
+        if (properties == null) {
+            properties = StreamingParser.defaultProperties;
         }
 
-        logger.info("TimedJsonStreamParser with tsColName {}", tsColName);
+        tsColName = properties.get(PROPERTY_TS_COLUMN_NAME);
+        tsParser = properties.get(PROPERTY_TS_PARSER);
+        separator = properties.get(EMBEDDED_PROPERTY_SEPARATOR);
 
         if (!StringUtils.isEmpty(tsParser)) {
             try {
                 Class clazz = Class.forName(tsParser);
-                Constructor constructor = clazz.getConstructor(String[].class);
-                streamTimeParser = (AbstractTimeParser) constructor.newInstance((Object)properties);
+                Constructor constructor = clazz.getConstructor(Map.class);
+                streamTimeParser = (AbstractTimeParser) constructor.newInstance(properties);
             } catch (Exception e) {
-                throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".", e);
+                throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".", e);
             }
         } else {
-            throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".");
+            throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".");
         }
         mapper = new ObjectMapper();
         mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
@@ -108,7 +101,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
             Map<String, Object> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
             Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
             root.putAll(message);
-            String tsStr = String.valueOf(root.get(tsColName));
+            String tsStr = objToString(root.get(tsColName));
             long t = streamTimeParser.parseTime(tsStr);
             ArrayList<String> result = Lists.newArrayList();
 
@@ -116,8 +109,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
                 String columnName = column.getName().toLowerCase();
 
                 if (populateDerivedTimeColumns(columnName, result, t) == false) {
-                    String x = String.valueOf(root.get(columnName));
-                    result.add(x);
+                    result.add(getValueByKey(columnName, root));
                 }
             }
 
@@ -133,4 +125,35 @@ public final class TimedJsonStreamParser extends StreamingParser {
         return true;
     }
 
+    protected String getValueByKey(String key, Map<String, Object> root) throws IOException {
+        if (root.containsKey(key)) {
+            return objToString(root.get(key));
+        }
+
+        if (key.contains(separator)) {
+            String[] names = key.toLowerCase().split(separator);
+            Map<String, Object> tempMap = null;
+            for (int i = 0; i < names.length - 1; i++) {
+                Object o = root.get(names[i]);
+                if (o instanceof Map) {
+                    tempMap = (Map<String, Object>) o;
+                } else {
+                    throw new IOException("Property '" + names[i] + "' is not embedded format");
+                }
+            }
+            Object finalObject = tempMap.get(names[names.length - 1]);
+            return objToString(finalObject);
+
+        }
+
+        return StringUtils.EMPTY;
+    }
+
+    static String objToString(Object value) {
+        if (value == null)
+            return StringUtils.EMPTY;
+
+        return String.valueOf(value);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index efaa042..b1b4011 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka.diagnose;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -28,6 +29,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -289,8 +291,10 @@ public class KafkaInputAnalyzer extends AbstractApplication {
         String task = optionsHelper.getOptionValue(OPTION_TASK);
         String tsColName = optionsHelper.getOptionValue(OPTION_TSCOLNAME);
 
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(StreamingParser.PROPERTY_TS_COLUMN_NAME, tsColName);
         kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(streaming);
-        parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), "formatTs=true;tsColName=" + tsColName);
+        parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), properties);
 
         if ("disorder".equalsIgnoreCase(task)) {
             analyzeDisorder();