You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/04 07:53:04 UTC

incubator-kylin git commit: KYLIN-808 fix bugs

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 da381a4a8 -> 07a264e6d


KYLIN-808 fix bugs


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/07a264e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/07a264e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/07a264e6

Branch: refs/heads/0.8.0
Commit: 07a264e6d89ce282d309c5dfdf87aa6bf37158bd
Parents: da381a4
Author: honma <ho...@ebay.com>
Authored: Thu Jun 4 13:52:51 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Jun 4 13:52:51 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/TimeUtil.java  |  4 ++++
 .../org/apache/kylin/common/util/BasicTest.java |  5 +++--
 .../kylin/job/streaming/CubeStreamConsumer.java | 10 ++++-----
 .../kylin/job/streaming/StreamingBootstrap.java | 16 ++++++++------
 .../apache/kylin/streaming/StreamBuilder.java   | 22 +++++++++++---------
 5 files changed, 34 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/07a264e6/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
index 0aa58e4..c79e88b 100644
--- a/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
@@ -22,4 +22,8 @@ public class TimeUtil {
     public static long getDayStart(long ts) {
         return ts / ONE_DAY_TS * ONE_DAY_TS;
     }
+
+    public static long getNextPeriodStart(long ts, long period) {
+        return ((ts + period - 1) / period) * period;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/07a264e6/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index daab36f..3bc4fda 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -77,8 +77,9 @@ public class BasicTest {
     @Test
     @Ignore("convenient trial tool for dev")
     public void test1() throws Exception {
-        System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1432083600000L));
-        System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-05-14 17:00:00"));
+        System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433250418000L));
+        System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433250517000L));
+        System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-06-02 13:05:00"));
         System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-05-15 17:00:00"));
 
         String bb = "\\x00\\x00\\x00\\x00\\x01\\x3F\\xD0\\x2D\\58\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00";//2013/07/12 07:59:37

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/07a264e6/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 18373a9..6c90209 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -19,10 +19,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.*;
 import org.apache.kylin.cube.CubeBuilder;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -86,8 +83,11 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
     @Override
     public void consume(MicroStreamBatch microStreamBatch) throws Exception {
         if (microStreamBatch.size() == 0) {
-            logger.info("nothing to build, skip to next iteration");
+            logger.info("nothing to build, skip to next iteration after sleeping 10s");
+            Thread.sleep(10000);
             return;
+        } else {
+            logger.info("Consuming {} messages, covering from {} to {}", new String[] { String.valueOf(microStreamBatch.size()), DateFormat.formatToTimeStr(microStreamBatch.getTimestamp().getFirst()), DateFormat.formatToTimeStr(microStreamBatch.getTimestamp().getSecond()) });
         }
 
         totalConsumedMessageCount += microStreamBatch.size();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/07a264e6/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index b7ed60c..f45ec5f 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -43,6 +43,7 @@ import kafka.javaapi.PartitionMetadata;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.invertedindex.IIInstance;
@@ -58,7 +59,10 @@ import org.slf4j.LoggerFactory;
 import java.lang.reflect.Constructor;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 /**
  */
@@ -159,7 +163,10 @@ public class StreamingBootstrap {
         final String cubeName = streamingConfig.getCubeName();
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
 
-        StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, new MicroBatchCondition(Integer.MAX_VALUE, 5 * 60 * 1000), new CubeStreamConsumer(cubeName), cubeInstance.getDateRangeEnd());
+        int batchInterval = 5 * 60 * 1000;
+        MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
+        long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
+        StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
         cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, cubeInstance.getAllColumns()));
         cubeStreamBuilder.setStreamFilter(getStreamFilter(streamingConfig));
         final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
@@ -230,10 +237,7 @@ public class StreamingBootstrap {
         Executors.newSingleThreadExecutor().submit(consumer);
         final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
         for (int i = startShard; i < endShard; ++i) {
-            final StreamBuilder task = new StreamBuilder(consumer.getStreamQueue(i % parallelism),
-                    new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
-                    new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i),
-                    0L);
+            final StreamBuilder task = new StreamBuilder(consumer.getStreamQueue(i % parallelism), new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE), new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
             task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
             if (i == endShard - 1) {
                 streamingBuilderPool.submit(task).get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/07a264e6/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index cda6209..f188990 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -160,7 +160,7 @@ public class StreamBuilder implements Runnable {
                         logger.warn("stream queue should not be interrupted", e);
                         return null;
                     }
-                    if (System.currentTimeMillis() - t <= timeout) {
+                    if (System.currentTimeMillis() - t > timeout) {
                         break;
                     }
                 }
@@ -192,24 +192,26 @@ public class StreamBuilder implements Runnable {
                         return null;
                     }
 
+                    microStreamBatch.incRawMessageCount();
                     final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
-                    final long timestamp = parsedStreamMessage.getTimestamp();
-                    if (timestamp < startTimestamp) {
-                        streamMessageQueue.take();
-                    } else if (timestamp < endTimestamp) {
-                        streamMessageQueue.take();
-                        microStreamBatch.incRawMessageCount();
-                        if (getStreamFilter().filter(parsedStreamMessage)) {
+                    if (getStreamFilter().filter(parsedStreamMessage)) {
+                        final long timestamp = parsedStreamMessage.getTimestamp();
+                        if (timestamp < startTimestamp) {
+                            //TODO properly handle late megs
+                            streamMessageQueue.take();
+                        } else if (timestamp < endTimestamp) {
+                            streamMessageQueue.take();
                             if (microStreamBatch.size() >= condition.getBatchSize()) {
                                 return microStreamBatch;
                             } else {
                                 microStreamBatch.add(parsedStreamMessage);
                             }
                         } else {
-                            //ignore unfiltered stream message
+                            return microStreamBatch;
                         }
                     } else {
-                        return microStreamBatch;
+                        //ignore unfiltered stream message
+                        streamMessageQueue.take();
                     }
                 }
             } catch (Exception e) {