You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/04 07:53:04 UTC
incubator-kylin git commit: KYLIN-808 fix bugs
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 da381a4a8 -> 07a264e6d
KYLIN-808 fix bugs
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/07a264e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/07a264e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/07a264e6
Branch: refs/heads/0.8.0
Commit: 07a264e6d89ce282d309c5dfdf87aa6bf37158bd
Parents: da381a4
Author: honma <ho...@ebay.com>
Authored: Thu Jun 4 13:52:51 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 4 13:52:51 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/TimeUtil.java | 4 ++++
.../org/apache/kylin/common/util/BasicTest.java | 5 +++--
.../kylin/job/streaming/CubeStreamConsumer.java | 10 ++++-----
.../kylin/job/streaming/StreamingBootstrap.java | 16 ++++++++------
.../apache/kylin/streaming/StreamBuilder.java | 22 +++++++++++---------
5 files changed, 34 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/07a264e6/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
index 0aa58e4..c79e88b 100644
--- a/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
@@ -22,4 +22,8 @@ public class TimeUtil {
public static long getDayStart(long ts) {
return ts / ONE_DAY_TS * ONE_DAY_TS;
}
+
+ public static long getNextPeriodStart(long ts, long period) {
+ return ((ts + period - 1) / period) * period;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/07a264e6/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index daab36f..3bc4fda 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -77,8 +77,9 @@ public class BasicTest {
@Test
@Ignore("convenient trial tool for dev")
public void test1() throws Exception {
- System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1432083600000L));
- System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-05-14 17:00:00"));
+ System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433250418000L));
+ System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433250517000L));
+ System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-06-02 13:05:00"));
System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-05-15 17:00:00"));
String bb = "\\x00\\x00\\x00\\x00\\x01\\x3F\\xD0\\x2D\\58\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00";//2013/07/12 07:59:37
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/07a264e6/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 18373a9..6c90209 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -19,10 +19,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.*;
import org.apache.kylin.cube.CubeBuilder;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -86,8 +83,11 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
@Override
public void consume(MicroStreamBatch microStreamBatch) throws Exception {
if (microStreamBatch.size() == 0) {
- logger.info("nothing to build, skip to next iteration");
+ logger.info("nothing to build, skip to next iteration after sleeping 10s");
+ Thread.sleep(10000);
return;
+ } else {
+ logger.info("Consuming {} messages, covering from {} to {}", new String[] { String.valueOf(microStreamBatch.size()), DateFormat.formatToTimeStr(microStreamBatch.getTimestamp().getFirst()), DateFormat.formatToTimeStr(microStreamBatch.getTimestamp().getSecond()) });
}
totalConsumedMessageCount += microStreamBatch.size();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/07a264e6/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index b7ed60c..f45ec5f 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -43,6 +43,7 @@ import kafka.javaapi.PartitionMetadata;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.invertedindex.IIInstance;
@@ -58,7 +59,10 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
/**
*/
@@ -159,7 +163,10 @@ public class StreamingBootstrap {
final String cubeName = streamingConfig.getCubeName();
final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
- StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, new MicroBatchCondition(Integer.MAX_VALUE, 5 * 60 * 1000), new CubeStreamConsumer(cubeName), cubeInstance.getDateRangeEnd());
+ int batchInterval = 5 * 60 * 1000;
+ MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
+ long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
+ StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, cubeInstance.getAllColumns()));
cubeStreamBuilder.setStreamFilter(getStreamFilter(streamingConfig));
final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
@@ -230,10 +237,7 @@ public class StreamingBootstrap {
Executors.newSingleThreadExecutor().submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
- final StreamBuilder task = new StreamBuilder(consumer.getStreamQueue(i % parallelism),
- new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
- new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i),
- 0L);
+ final StreamBuilder task = new StreamBuilder(consumer.getStreamQueue(i % parallelism), new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE), new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
if (i == endShard - 1) {
streamingBuilderPool.submit(task).get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/07a264e6/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index cda6209..f188990 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -160,7 +160,7 @@ public class StreamBuilder implements Runnable {
logger.warn("stream queue should not be interrupted", e);
return null;
}
- if (System.currentTimeMillis() - t <= timeout) {
+ if (System.currentTimeMillis() - t > timeout) {
break;
}
}
@@ -192,24 +192,26 @@ public class StreamBuilder implements Runnable {
return null;
}
+ microStreamBatch.incRawMessageCount();
final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
- final long timestamp = parsedStreamMessage.getTimestamp();
- if (timestamp < startTimestamp) {
- streamMessageQueue.take();
- } else if (timestamp < endTimestamp) {
- streamMessageQueue.take();
- microStreamBatch.incRawMessageCount();
- if (getStreamFilter().filter(parsedStreamMessage)) {
+ if (getStreamFilter().filter(parsedStreamMessage)) {
+ final long timestamp = parsedStreamMessage.getTimestamp();
+ if (timestamp < startTimestamp) {
+ //TODO properly handle late megs
+ streamMessageQueue.take();
+ } else if (timestamp < endTimestamp) {
+ streamMessageQueue.take();
if (microStreamBatch.size() >= condition.getBatchSize()) {
return microStreamBatch;
} else {
microStreamBatch.add(parsedStreamMessage);
}
} else {
- //ignore unfiltered stream message
+ return microStreamBatch;
}
} else {
- return microStreamBatch;
+ //ignore unfiltered stream message
+ streamMessageQueue.take();
}
}
} catch (Exception e) {