You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/10/06 06:46:09 UTC
[1/3] kylin git commit: KYLIN-2059 remove the time threshold on
storage cleanup
Repository: kylin
Updated Branches:
refs/heads/master 792d4ee3f -> 95ebc8512
KYLIN-2059 remove the time threshold on storage cleanup
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/95ebc851
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/95ebc851
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/95ebc851
Branch: refs/heads/master
Commit: 95ebc851274953023ba092a249c1ddbbee5cd845
Parents: 334c2e0
Author: shaofengshi <sh...@apache.org>
Authored: Fri Sep 30 09:13:03 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 6 14:44:05 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfigBase.java | 4 ----
.../kylin/engine/mr/steps/MetadataCleanupJob.java | 2 +-
.../kylin/storage/hbase/util/StorageCleanupJob.java | 15 ++++-----------
3 files changed, 5 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/95ebc851/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4d1639b..99c3c5a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -780,10 +780,6 @@ abstract public class KylinConfigBase implements Serializable {
return Long.parseLong(getOptional("kylin.query.hbase.hconnection.threads.alive.seconds", "60"));
}
- public long getStorageCleanupTimeThreshold() {
- return Long.valueOf(this.getOptional("kylin.storage.cleanup.time.threshold", "172800000")); //default two days
- }
-
public int getAppendDictEntrySize() {
return Integer.parseInt(getOptional("kylin.dict.append.entry.size", "10000000"));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/95ebc851/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
index 962697e..f2b1d6b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
@@ -54,7 +54,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
private KylinConfig config = null;
- public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000L; // 2 days
+ public static final long TIME_THREADSHOLD = 1 * 3600 * 1000L; // 1 hour
public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000L; // 30 days
/*
http://git-wip-us.apache.org/repos/asf/kylin/blob/95ebc851/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index dffce36..2c2f11c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -34,7 +34,6 @@ import java.util.regex.Pattern;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -63,10 +62,10 @@ public class StorageCleanupJob extends AbstractApplication {
@SuppressWarnings("static-access")
protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
- protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete any intermediate hive tables").create("force");
+ protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete all kylin intermediate hive tables").create("force");
protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
- public static final int TIME_THRESHOLD_DELETE_HTABLE = 10; // Unit minute
+ public static final int deleteTimeout = 10; // Unit minute
protected boolean delete = false;
protected boolean force = false;
@@ -74,7 +73,6 @@ public class StorageCleanupJob extends AbstractApplication {
private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- long TIME_THREADSHOLD = KylinConfig.getInstanceFromEnv().getStorageCleanupTimeThreshold();
// get all kylin hbase tables
HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
@@ -82,14 +80,9 @@ public class StorageCleanupJob extends AbstractApplication {
List<String> allTablesNeedToBeDropped = new ArrayList<String>();
for (HTableDescriptor desc : tableDescriptors) {
String host = desc.getValue(IRealizationConstants.HTableTag);
- String creationTime = desc.getValue(IRealizationConstants.HTableCreationTime);
if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) {
//only take care htables that belongs to self, and created more than 2 days
- if (StringUtils.isEmpty(creationTime) || (System.currentTimeMillis() - Long.valueOf(creationTime) > TIME_THREADSHOLD)) {
allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
- } else {
- logger.info("Exclude table " + desc.getTableName().getNameAsString() + " from drop list, as it is newly created");
- }
}
}
@@ -111,9 +104,9 @@ public class StorageCleanupJob extends AbstractApplication {
FutureTask futureTask = new FutureTask(new DeleteHTableRunnable(hbaseAdmin, htableName));
executorService.execute(futureTask);
try {
- futureTask.get(TIME_THRESHOLD_DELETE_HTABLE, TimeUnit.MINUTES);
+ futureTask.get(deleteTimeout, TimeUnit.MINUTES);
} catch (TimeoutException e) {
- logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + TIME_THRESHOLD_DELETE_HTABLE + " minutes!");
+ logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + deleteTimeout + " minutes!");
futureTask.cancel(true);
} catch (Exception e) {
e.printStackTrace();
[2/3] kylin git commit: KYLIN-2059 Concurrent build issue in
CubeManager.calculateToBeSegments()
Posted by sh...@apache.org.
KYLIN-2059 Concurrent build issue in CubeManager.calculateToBeSegments()
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/334c2e09
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/334c2e09
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/334c2e09
Branch: refs/heads/master
Commit: 334c2e09ba443b8fdcb7d4bfe08ab8fbc0ac3fbe
Parents: aa51ce0
Author: shaofengshi <sh...@apache.org>
Authored: Thu Sep 29 22:56:00 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 6 14:44:05 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 4 +
.../java/org/apache/kylin/cube/CubeManager.java | 77 ++++++++-------
.../org/apache/kylin/cube/CubeManagerTest.java | 99 ++++++++++++++++++++
3 files changed, 145 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/334c2e09/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 838ef97..4d1639b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -809,4 +809,8 @@ abstract public class KylinConfigBase implements Serializable {
public int getMaxBuildingSegments() {
return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "2"));
}
+
+ public void setMaxBuildingSegments(int maxBuildingSegments) {
+ setProperty("kylin.cube.building.segment.max", String.valueOf(maxBuildingSegments));
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/334c2e09/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 962568c..d243f4d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -135,7 +135,7 @@ public class CubeManager implements IRealizationProvider {
logger.info("Initializing CubeManager with config " + config);
this.config = config;
this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube");
-
+
// touch lower level metadata before registering my listener
loadAllCubeInstance();
Broadcaster.getInstance(config).registerListener(new CubeSyncListener(), "cube");
@@ -159,12 +159,12 @@ public class CubeManager implements IRealizationProvider {
@Override
public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
String cubeName = cacheKey;
-
+
if (event == Event.DROP)
removeCubeLocal(cubeName);
else
reloadCubeLocal(cubeName);
-
+
for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.CUBE, cubeName)) {
broadcaster.notifyProjectDataUpdate(prj.getName());
}
@@ -615,7 +615,6 @@ public class CubeManager implements IRealizationProvider {
return max;
}
-
private long calculateStartOffsetForAppendSegment(CubeInstance cube) {
List<CubeSegment> existing = cube.getSegments();
if (existing.isEmpty()) {
@@ -625,7 +624,6 @@ public class CubeManager implements IRealizationProvider {
}
}
-
private long calculateStartDateForAppendSegment(CubeInstance cube) {
List<CubeSegment> existing = cube.getSegments();
if (existing.isEmpty()) {
@@ -728,7 +726,7 @@ public class CubeManager implements IRealizationProvider {
List<CubeSegment> mergingSegs = Lists.newArrayList();
if (buildingSegs.size() > 0) {
-
+
for (CubeSegment building : buildingSegs) {
// exclude those under-merging segs
for (CubeSegment ready : readySegs) {
@@ -760,27 +758,22 @@ public class CubeManager implements IRealizationProvider {
return null;
}
- public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment... newSegments) throws IOException {
- List<CubeSegment> tobe = calculateToBeSegments(cube);
+ public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
+ if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier()))
+ throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier");
- for (CubeSegment seg : newSegments) {
- if (tobe.contains(seg) == false)
- throw new IllegalStateException("For cube " + cube + ", segment " + seg + " is expected but not in the tobe " + tobe);
+ if (StringUtils.isBlank(newSegment.getLastBuildJobID()))
+ throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID");
- if (StringUtils.isBlank(seg.getStorageLocationIdentifier()))
- throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing StorageLocationIdentifier");
+ if (isReady(newSegment) == true)
+ throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY");
- if (StringUtils.isBlank(seg.getLastBuildJobID()))
- throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing LastBuildJobID");
+ List<CubeSegment> tobe = calculateToBeSegments(cube, newSegment);
- seg.setStatus(SegmentStatusEnum.READY);
- }
+ if (tobe.contains(newSegment) == false)
+ throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe);
- for (CubeSegment seg : tobe) {
- if (isReady(seg) == false) {
- logger.warn("For cube " + cube + ", segment " + seg + " isn't READY yet.");
- }
- }
+ newSegment.setStatus(SegmentStatusEnum.READY);
List<CubeSegment> toRemoveSegs = Lists.newArrayList();
for (CubeSegment segment : cube.getSegments()) {
@@ -788,14 +781,14 @@ public class CubeManager implements IRealizationProvider {
toRemoveSegs.add(segment);
}
- logger.info("Promoting cube " + cube + ", new segments " + Arrays.toString(newSegments) + ", to remove segments " + toRemoveSegs);
+ logger.info("Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs);
CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])).setToUpdateSegs(newSegments).setStatus(RealizationStatusEnum.READY);
+ cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])).setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY);
updateCube(cubeBuilder);
}
- public void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
+ public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments);
List<CubeSegment> newList = Arrays.asList(newSegments);
if (tobe.containsAll(newList) == false) {
@@ -809,11 +802,12 @@ public class CubeManager implements IRealizationProvider {
* - Favors new segments over the old
* - Favors big segments over the small
*/
- private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment... newSegments) {
+ private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment newSegments) {
List<CubeSegment> tobe = Lists.newArrayList(cube.getSegments());
- if (newSegments != null)
- tobe.addAll(Arrays.asList(newSegments));
+ if (newSegments != null && !tobe.contains(newSegments)) {
+ tobe.add(newSegments);
+ }
if (tobe.size() == 0)
return tobe;
@@ -849,13 +843,17 @@ public class CubeManager implements IRealizationProvider {
} else {
tobe.remove(j);
}
- } else if (isNew(is)) {
- // otherwise, favor the new segment
- tobe.remove(j);
+ continue;
} else {
- tobe.remove(i);
+ // otherwise, favor the new segment
+ if (isNew(is) && is.equals(newSegments)) {
+ tobe.remove(j);
+ continue;
+ } else if (js.equals(newSegments)) {
+ tobe.remove(i);
+ continue;
+ }
}
- continue;
}
// if i, j in sequence
@@ -865,8 +863,17 @@ public class CubeManager implements IRealizationProvider {
continue;
}
- // seems j not fitting
- tobe.remove(j);
+ // js can be covered by is
+ if (is.equals(newSegments)) {
+ // seems j not fitting
+ tobe.remove(j);
+ continue;
+ } else {
+ i++;
+ j++;
+ continue;
+ }
+
}
return tobe;
http://git-wip-us.apache.org/repos/asf/kylin/blob/334c2e09/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index 49bb128..e63fe99 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.kylin.cube;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -123,6 +124,104 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
}
+
+ @Test
+ public void testConcurrentBuildAndMerge() throws Exception {
+ CubeManager mgr = CubeManager.getInstance(getTestConfig());
+ CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+ getTestConfig().setMaxBuildingSegments(10);
+ // no segment at first
+ assertEquals(0, cube.getSegments().size());
+
+ // append first
+ CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000);
+ seg1.setStatus(SegmentStatusEnum.READY);
+
+ CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000);
+ seg2.setStatus(SegmentStatusEnum.READY);
+
+
+ CubeSegment seg3 = mgr.mergeSegments(cube, 0, 0, 0000, 2000, true);
+ seg3.setStatus(SegmentStatusEnum.NEW);
+
+
+ CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 2000, 3000);
+ seg4.setStatus(SegmentStatusEnum.NEW);
+ seg4.setLastBuildJobID("test");
+ seg4.setStorageLocationIdentifier("test");
+
+ CubeSegment seg5 = mgr.appendSegment(cube, 0, 0, 3000, 4000);
+ seg5.setStatus(SegmentStatusEnum.READY);
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+
+ mgr.updateCube(cubeBuilder);
+
+
+ mgr.promoteNewlyBuiltSegments(cube, seg4);
+
+ assertTrue(cube.getSegments().size() == 5);
+
+ assertTrue(cube.getSegmentById(seg1.getUuid()) != null && cube.getSegmentById(seg1.getUuid()).getStatus() == SegmentStatusEnum.READY);
+ assertTrue(cube.getSegmentById(seg2.getUuid()) != null && cube.getSegmentById(seg2.getUuid()).getStatus() == SegmentStatusEnum.READY);
+ assertTrue(cube.getSegmentById(seg3.getUuid()) != null && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.NEW);
+ assertTrue(cube.getSegmentById(seg4.getUuid()) != null && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY);
+ assertTrue(cube.getSegmentById(seg5.getUuid()) != null && cube.getSegmentById(seg5.getUuid()).getStatus() == SegmentStatusEnum.READY);
+
+ }
+
+
+ @Test
+ public void testConcurrentMergeAndMerge() throws Exception {
+ CubeManager mgr = CubeManager.getInstance(getTestConfig());
+ getTestConfig().setMaxBuildingSegments(10);
+ CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
+
+ // no segment at first
+ assertEquals(0, cube.getSegments().size());
+
+ // append first
+ CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000);
+ seg1.setStatus(SegmentStatusEnum.READY);
+
+ CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000);
+ seg2.setStatus(SegmentStatusEnum.READY);
+
+ CubeSegment seg3 = mgr.appendSegment(cube, 0, 0, 2000, 3000);
+ seg3.setStatus(SegmentStatusEnum.READY);
+
+ CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 3000, 4000);
+ seg4.setStatus(SegmentStatusEnum.READY);
+
+
+
+ CubeSegment merge1 = mgr.mergeSegments(cube, 0, 0, 0, 2000, true);
+ merge1.setStatus(SegmentStatusEnum.NEW);
+ merge1.setLastBuildJobID("test");
+ merge1.setStorageLocationIdentifier("test");
+
+ CubeSegment merge2 = mgr.mergeSegments(cube, 0, 0, 2000, 4000, true);
+ merge2.setStatus(SegmentStatusEnum.NEW);
+ merge2.setLastBuildJobID("test");
+ merge2.setStorageLocationIdentifier("test");
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ mgr.updateCube(cubeBuilder);
+
+
+ mgr.promoteNewlyBuiltSegments(cube, merge1);
+
+ assertTrue(cube.getSegments().size() == 4);
+
+ assertTrue(cube.getSegmentById(seg1.getUuid()) == null);
+ assertTrue(cube.getSegmentById(seg2.getUuid()) == null);
+ assertTrue(cube.getSegmentById(merge1.getUuid()) != null && cube.getSegmentById(merge1.getUuid()).getStatus() == SegmentStatusEnum.READY);
+ assertTrue(cube.getSegmentById(seg3.getUuid()) != null && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.READY);
+ assertTrue(cube.getSegmentById(seg4.getUuid()) != null && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY);
+ assertTrue(cube.getSegmentById(merge2.getUuid()) != null && cube.getSegmentById(merge2.getUuid()).getStatus() == SegmentStatusEnum.NEW);
+
+ }
+
@Test
public void testGetAllCubes() throws Exception {
final ResourceStore store = ResourceStore.getStore(getTestConfig());
[3/3] kylin git commit: KYLIN-1919 support embedded json format
Posted by sh...@apache.org.
KYLIN-1919 support embedded json format
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aa51ce0c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aa51ce0c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aa51ce0c
Branch: refs/heads/master
Commit: aa51ce0c3382d330ba5418b49eb669c964315f96
Parents: 792d4ee
Author: shaofengshi <sh...@apache.org>
Authored: Thu Sep 29 16:04:46 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Oct 6 14:44:05 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/DeployUtil.java | 2 +-
.../kylin/provision/BuildCubeWithEngine.java | 2 +-
.../kylin/provision/BuildCubeWithStream.java | 16 ++--
.../java/org/apache/kylin/rest/DebugTomcat.java | 3 +-
.../kylin/source/kafka/AbstractTimeParser.java | 4 +-
.../kylin/source/kafka/DateTimeParser.java | 40 ++-------
.../kylin/source/kafka/DefaultTimeParser.java | 4 +-
.../kylin/source/kafka/StreamingParser.java | 41 ++++++++-
.../source/kafka/StringStreamingParser.java | 3 +-
.../source/kafka/TimedJsonStreamParser.java | 95 ++++++++++++--------
.../kafka/diagnose/KafkaInputAnalyzer.java | 6 +-
11 files changed, 133 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 9e9df05..be9b2a9 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -156,7 +156,7 @@ public class DeployUtil {
for (ColumnDesc columnDesc : tableDesc.getColumns()) {
tableColumns.add(columnDesc.getRef());
}
- TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
+ TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, null);
StringBuilder sb = new StringBuilder();
for (String json : data) {
List<String> rowColumns = timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).getData();
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 31cf0eb..971b293 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -84,7 +84,7 @@ public class BuildCubeWithEngine {
afterClass();
logger.info("Going to exit");
System.exit(0);
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("error", e);
System.exit(1);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 58715f1..f8805a6 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -234,6 +234,8 @@ public class BuildCubeWithStream {
segments = cubeManager.getCube(cubeName).getSegments();
Assert.assertTrue(segments.size() == 1);
}
+
+ logger.info("Build is done");
}
@@ -309,20 +311,22 @@ public class BuildCubeWithStream {
}
public static void main(String[] args) throws Exception {
+ BuildCubeWithStream buildCubeWithStream = null;
try {
beforeClass();
-
- BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream();
+ buildCubeWithStream = new BuildCubeWithStream();
buildCubeWithStream.before();
buildCubeWithStream.build();
- logger.info("Build is done");
- buildCubeWithStream.after();
- afterClass();
logger.info("Going to exit");
System.exit(0);
- } catch (Exception e) {
+ } catch (Throwable e) {
logger.error("error", e);
System.exit(1);
+ } finally {
+ if (buildCubeWithStream != null) {
+ buildCubeWithStream.after();
+ }
+ afterClass();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
index 7417a05..0f2c500 100644
--- a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
+++ b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
@@ -48,7 +48,8 @@ public class DebugTomcat {
System.setProperty("catalina.home", ".");
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
- throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+ System.err.println("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+ System.exit(1);
}
// workaround for job submission from win to linux -- https://issues.apache.org/jira/browse/MAPREDUCE-4052
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
index 96a4ece..26624ef 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java
@@ -18,11 +18,13 @@
package org.apache.kylin.source.kafka;
+import java.util.Map;
+
/**
*/
public abstract class AbstractTimeParser {
- public AbstractTimeParser(String[] properties) {
+ public AbstractTimeParser(Map<String, String> properties) {
}
/**
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
index 2bd699d..3382783 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java
@@ -18,51 +18,29 @@
package org.apache.kylin.source.kafka;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.kylin.common.util.DateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.ParseException;
+import java.util.Map;
/**
*/
public class DateTimeParser extends AbstractTimeParser {
private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class);
- private String tsPattern = DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS;
+ private String tsPattern = null;
private FastDateFormat formatter = null;
//call by reflection
- public DateTimeParser(String[] properties) {
+ public DateTimeParser(Map<String, String> properties) {
super(properties);
- for (String prop : properties) {
- try {
- String[] parts = prop.split("=");
- if (parts.length == 2) {
- switch (parts[0]) {
- case "tsPattern":
- this.tsPattern = parts[1];
- break;
- default:
- break;
- }
- }
- } catch (Exception e) {
- logger.error("Failed to parse property " + prop);
- //ignore
- }
- }
+ tsPattern = properties.get(StreamingParser.PROPERTY_TS_PATTERN);
- if (!StringUtils.isEmpty(tsPattern)) {
- try {
- formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
- } catch (Throwable e) {
- throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
- }
- } else {
+ try {
+ formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern);
+ } catch (Throwable e) {
throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'.");
}
}
@@ -77,8 +55,8 @@ public class DateTimeParser extends AbstractTimeParser {
try {
return formatter.parse(timeStr).getTime();
- } catch (ParseException e) {
- throw new IllegalArgumentException("Invalid value : pattern: '" + tsPattern + "', value: '" + timeStr + "'" , e);
+ } catch (Throwable e) {
+ throw new IllegalArgumentException("Invalid value: pattern: '" + tsPattern + "', value: '" + timeStr + "'", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
index 85f2bfa..4bcd572 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java
@@ -20,11 +20,13 @@ package org.apache.kylin.source.kafka;
import org.apache.commons.lang3.StringUtils;
+import java.util.Map;
+
/**
*/
public class DefaultTimeParser extends AbstractTimeParser {
- public DefaultTimeParser(String[] properties) {
+ public DefaultTimeParser(Map<String, String> properties) {
super(properties);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 4d840b8..43b2ac5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -20,8 +20,10 @@ package org.apache.kylin.source.kafka;
import java.lang.reflect.Constructor;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import java.nio.ByteBuffer;
import org.apache.kylin.common.util.DateFormat;
@@ -30,12 +32,21 @@ import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, String propertiesStr) as params
+ * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, Map properties) as params
*/
public abstract class StreamingParser {
+ private static final Logger logger = LoggerFactory.getLogger(StreamingParser.class);
+ public static final String PROPERTY_TS_COLUMN_NAME = "tsColName";
+ public static final String PROPERTY_TS_PARSER = "tsParser";
+ public static final String PROPERTY_TS_PATTERN = "tsPattern";
+ public static final String EMBEDDED_PROPERTY_SEPARATOR = "separator";
+
+ public static final Map<String, String> defaultProperties = Maps.newHashMap();
public static final Set derivedTimeColumns = Sets.newHashSet();
static {
derivedTimeColumns.add("minute_start");
@@ -45,6 +56,10 @@ public abstract class StreamingParser {
derivedTimeColumns.add("month_start");
derivedTimeColumns.add("quarter_start");
derivedTimeColumns.add("year_start");
+ defaultProperties.put(PROPERTY_TS_COLUMN_NAME, "timestamp");
+ defaultProperties.put(PROPERTY_TS_PARSER, "org.apache.kylin.source.kafka.DefaultTimeParser");
+ defaultProperties.put(PROPERTY_TS_PATTERN, DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+ defaultProperties.put(EMBEDDED_PROPERTY_SEPARATOR, "_");
}
/**
@@ -57,14 +72,34 @@ public abstract class StreamingParser {
public static StreamingParser getStreamingParser(String parserName, String parserProperties, List<TblColRef> columns) throws ReflectiveOperationException {
if (!StringUtils.isEmpty(parserName)) {
+ logger.info("Construct StreamingParse {} with properties {}", parserName, parserProperties);
Class clazz = Class.forName(parserName);
- Constructor constructor = clazz.getConstructor(List.class, String.class);
- return (StreamingParser) constructor.newInstance(columns, parserProperties);
+ Map<String, String> properties = parseProperties(parserProperties);
+ Constructor constructor = clazz.getConstructor(List.class, Map.class);
+ return (StreamingParser) constructor.newInstance(columns, properties);
} else {
throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + ".");
}
}
+ public static Map<String, String> parseProperties(String propertiesStr) {
+
+ Map<String, String> result = Maps.newHashMap(defaultProperties);
+ if (!StringUtils.isEmpty(propertiesStr)) {
+ String[] properties = propertiesStr.split(";");
+ for (String prop : properties) {
+ String[] parts = prop.split("=");
+ if (parts.length == 2) {
+ result.put(parts[0], parts[1]);
+ } else {
+ logger.warn("Ignored invalid property expression '" + prop + "'");
+ }
+ }
+ }
+
+ return result;
+ }
+
/**
* Calculate the derived time column value and put to the result list.
* @param columnName the column name, should be in lower case
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index cea8e0b..f74df83 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.TblColRef;
@@ -33,7 +34,7 @@ public final class StringStreamingParser extends StreamingParser {
public static final StringStreamingParser instance = new StringStreamingParser(null, null);
- private StringStreamingParser(List<TblColRef> allColumns, String propertiesStr) {
+ private StringStreamingParser(List<TblColRef> allColumns, Map<String, String> properties) {
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 2125c05..d4327c5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.source.kafka;
@@ -42,7 +42,17 @@ import com.fasterxml.jackson.databind.type.SimpleType;
import com.google.common.collect.Lists;
/**
- * each json message with a "timestamp" field
+ * An utility class which parses a JSON streaming message to a list of strings (represent a row in table).
+ *
+ * Each message should have a property whose value represents the message's timestamp, default the column name is "timestamp"
+ * but can be customized by StreamingParser#PROPERTY_TS_PARSER.
+ *
+ * By default it will parse the timestamp col value as Unix time. If the format isn't Unix time, need specify the time parser
+ * with property StreamingParser#PROPERTY_TS_PARSER.
+ *
+ * It also support embedded JSON format; Use a separator (customized by StreamingParser#EMBEDDED_PROPERTY_SEPARATOR) to concat
+ * the property names.
+ *
*/
public final class TimedJsonStreamParser extends StreamingParser {
@@ -50,51 +60,34 @@ public final class TimedJsonStreamParser extends StreamingParser {
private List<TblColRef> allColumns;
private final ObjectMapper mapper;
- private String tsColName = "timestamp";
- private String tsParser = "org.apache.kylin.source.kafka.DefaultTimeParser";
+ private String tsColName = null;
+ private String tsParser = null;
+ private String separator = null;
+
private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class));
private AbstractTimeParser streamTimeParser;
- public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) {
+ public TimedJsonStreamParser(List<TblColRef> allColumns, Map<String, String> properties) {
this.allColumns = allColumns;
- String[] properties = null;
- if (!StringUtils.isEmpty(propertiesStr)) {
- properties = propertiesStr.split(";");
- for (String prop : properties) {
- try {
- String[] parts = prop.split("=");
- if (parts.length == 2) {
- switch (parts[0]) {
- case "tsColName":
- this.tsColName = parts[1];
- break;
- case "tsParser":
- this.tsParser = parts[1];
- break;
- default:
- break;
- }
- }
- } catch (Exception e) {
- logger.error("Failed to parse property " + prop);
- //ignore
- }
- }
+ if (properties == null) {
+ properties = StreamingParser.defaultProperties;
}
- logger.info("TimedJsonStreamParser with tsColName {}", tsColName);
+ tsColName = properties.get(PROPERTY_TS_COLUMN_NAME);
+ tsParser = properties.get(PROPERTY_TS_PARSER);
+ separator = properties.get(EMBEDDED_PROPERTY_SEPARATOR);
if (!StringUtils.isEmpty(tsParser)) {
try {
Class clazz = Class.forName(tsParser);
- Constructor constructor = clazz.getConstructor(String[].class);
- streamTimeParser = (AbstractTimeParser) constructor.newInstance((Object)properties);
+ Constructor constructor = clazz.getConstructor(Map.class);
+ streamTimeParser = (AbstractTimeParser) constructor.newInstance(properties);
} catch (Exception e) {
- throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".", e);
+ throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".", e);
}
} else {
- throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".");
+ throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + properties + ".");
}
mapper = new ObjectMapper();
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
@@ -108,7 +101,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
Map<String, Object> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
root.putAll(message);
- String tsStr = String.valueOf(root.get(tsColName));
+ String tsStr = objToString(root.get(tsColName));
long t = streamTimeParser.parseTime(tsStr);
ArrayList<String> result = Lists.newArrayList();
@@ -116,8 +109,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
String columnName = column.getName().toLowerCase();
if (populateDerivedTimeColumns(columnName, result, t) == false) {
- String x = String.valueOf(root.get(columnName));
- result.add(x);
+ result.add(getValueByKey(columnName, root));
}
}
@@ -133,4 +125,35 @@ public final class TimedJsonStreamParser extends StreamingParser {
return true;
}
+ protected String getValueByKey(String key, Map<String, Object> root) throws IOException {
+ if (root.containsKey(key)) {
+ return objToString(root.get(key));
+ }
+
+ if (key.contains(separator)) {
+ String[] names = key.toLowerCase().split(separator);
+ Map<String, Object> tempMap = null;
+ for (int i = 0; i < names.length - 1; i++) {
+ Object o = root.get(names[i]);
+ if (o instanceof Map) {
+ tempMap = (Map<String, Object>) o;
+ } else {
+ throw new IOException("Property '" + names[i] + "' is not embedded format");
+ }
+ }
+ Object finalObject = tempMap.get(names[names.length - 1]);
+ return objToString(finalObject);
+
+ }
+
+ return StringUtils.EMPTY;
+ }
+
+ static String objToString(Object value) {
+ if (value == null)
+ return StringUtils.EMPTY;
+
+ return String.valueOf(value);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/aa51ce0c/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index efaa042..b1b4011 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -21,6 +21,7 @@ package org.apache.kylin.source.kafka.diagnose;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -28,6 +29,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Maps;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -289,8 +291,10 @@ public class KafkaInputAnalyzer extends AbstractApplication {
String task = optionsHelper.getOptionValue(OPTION_TASK);
String tsColName = optionsHelper.getOptionValue(OPTION_TSCOLNAME);
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(StreamingParser.PROPERTY_TS_COLUMN_NAME, tsColName);
kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(streaming);
- parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), "formatTs=true;tsColName=" + tsColName);
+ parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), properties);
if ("disorder".equalsIgnoreCase(task)) {
analyzeDisorder();