You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/17 06:30:28 UTC

[1/2] incubator-kylin git commit: KYLIN-834 optimize StreamingUtil binary search perf

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 606d7deb2 -> b980228bf


KYLIN-834 optimize StreamingUtil binary search perf


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c698de90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c698de90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c698de90

Branch: refs/heads/0.8
Commit: c698de90d95763dc0539f918faaad31eb6810100
Parents: 606d7de
Author: honma <ho...@ebay.com>
Authored: Wed Jun 17 10:42:20 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 17 10:42:36 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/streaming/StreamingUtil.java   | 43 +++++++++++++-------
 1 file changed, 28 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c698de90/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 6086a55..160bdfe 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -1,6 +1,7 @@
 package org.apache.kylin.streaming;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
 import kafka.javaapi.FetchResponse;
@@ -11,6 +12,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.Map;
 
 /**
  */
@@ -53,18 +55,6 @@ public final class StreamingUtil {
         throw new IllegalStateException(String.format("try to get timestamp of topic: %s, partitionId: %d, offset: %d, failed to get StreamMessage from kafka", topic, partitionId, offset));
     }
 
-    public static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamParser streamParser) {
-        final String topic = kafkaClusterConfig.getTopic();
-        logger.debug(String.format("try to get timestamp of topic: %s, partitionId: %d, offset: %d", topic, partitionId, offset));
-        final MessageAndOffset messageAndOffset = getKafkaMessage(kafkaClusterConfig, partitionId, offset);
-        final ByteBuffer payload = messageAndOffset.message().payload();
-        byte[] bytes = new byte[payload.limit()];
-        payload.get(bytes);
-        final ParsedStreamMessage parsedStreamMessage = streamParser.parse(new StreamMessage(messageAndOffset.offset(), bytes));
-        logger.debug(String.format("get timestamp:%d", parsedStreamMessage.getTimestamp()));
-        return parsedStreamMessage.getTimestamp();
-    }
-
     public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamParser streamParser) {
         final String topic = kafkaClusterConfig.getTopic();
         final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
@@ -77,11 +67,13 @@ public final class StreamingUtil {
     }
 
     private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamParser streamParser) {
+        Map<Long, Long> cache = Maps.newHashMap();
+
         while (startOffset < endOffset) {
             long midOffset = startOffset + ((endOffset - startOffset) >> 1);
-            long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamParser);
-            long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamParser);
-            long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamParser);
+            long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamParser, cache);
+            long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamParser, cache);
+            long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamParser, cache);
             // hard to ensure these 2 conditions
             //            Preconditions.checkArgument(startTimestamp <= midTimestamp);
             //            Preconditions.checkArgument(midTimestamp <= endTimestamp);
@@ -102,6 +94,27 @@ public final class StreamingUtil {
             }
         }
         return startOffset;
+    }
+
+    private static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamParser streamParser, Map<Long, Long> cache) {
+        if (cache.containsKey(offset)) {
+            return cache.get(offset);
+        } else {
+            long t = getDataTimestamp(kafkaClusterConfig, partitionId, offset, streamParser);
+            cache.put(offset, t);
+            return t;
+        }
+    }
+
+    public static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamParser streamParser) {
+        final String topic = kafkaClusterConfig.getTopic();
+        final MessageAndOffset messageAndOffset = getKafkaMessage(kafkaClusterConfig, partitionId, offset);
+        final ByteBuffer payload = messageAndOffset.message().payload();
+        byte[] bytes = new byte[payload.limit()];
+        payload.get(bytes);
+        final ParsedStreamMessage parsedStreamMessage = streamParser.parse(new StreamMessage(messageAndOffset.offset(), bytes));
+        logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, parsedStreamMessage.getTimestamp()));
+        return parsedStreamMessage.getTimestamp();
 
     }
 }


[2/2] incubator-kylin git commit: Kafka consumer thread factory changed to daemon

Posted by ma...@apache.org.
Kafka consumer thread factory changed to daemon


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b980228b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b980228b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b980228b

Branch: refs/heads/0.8
Commit: b980228bf74b0c5fe6ee1526406ba61ea930d752
Parents: c698de9
Author: honma <ho...@ebay.com>
Authored: Wed Jun 17 11:16:37 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 17 11:16:37 2015 +0800

----------------------------------------------------------------------
 .../kylin/common/util/DaemonThreadFactory.java       | 15 +++++++++++++++
 .../kylin/job/streaming/StreamingBootstrap.java      | 11 ++++-------
 .../apache/kylin/streaming/ITKafkaConsumerTest.java  |  3 ++-
 3 files changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b980228b/common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java b/common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
new file mode 100644
index 0000000..bc4502c
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
@@ -0,0 +1,15 @@
+package org.apache.kylin.common.util;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ */
+public class DaemonThreadFactory implements ThreadFactory {
+    @Override
+    public Thread newThread(Runnable r) {
+        Thread t = Executors.defaultThreadFactory().newThread(r);
+        t.setDaemon(true);
+        return t;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b980228b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index ae6b282..6e2c02b 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -43,6 +43,7 @@ import kafka.cluster.Broker;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.DaemonThreadFactory;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.cube.CubeInstance;
@@ -160,7 +161,7 @@ public class StreamingBootstrap {
             }
             logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId + " transferredPartitionId:" + transferredPartitionId);
             KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
-            Executors.newSingleThreadExecutor().submit(consumer);
+            Executors.newSingleThreadExecutor(new DaemonThreadFactory()).submit(consumer);
             result.add(consumer.getStreamQueue(0));
         }
         return result;
@@ -314,14 +315,10 @@ public class StreamingBootstrap {
 
         final IIDesc iiDesc = iiSegment.getIIDesc();
 
-        Executors.newSingleThreadExecutor().submit(consumer);
+        Executors.newSingleThreadExecutor(new DaemonThreadFactory()).submit(consumer);
         final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
         for (int i = startShard; i < endShard; ++i) {
-            final StreamBuilder task = StreamBuilder.newLimitedSizeStreamBuilder(streamingConfig.getName(),
-                    consumer.getStreamQueue(i % parallelism),
-                    new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i),
-                    0L,
-                    iiDesc.getSliceSize());
+            final StreamBuilder task = StreamBuilder.newLimitedSizeStreamBuilder(streamingConfig.getName(), consumer.getStreamQueue(i % parallelism), new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L, iiDesc.getSliceSize());
             task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
             if (i == endShard - 1) {
                 streamingBuilderPool.submit(task).get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b980228b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
index 4ed7094..ca6fe87 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
@@ -36,6 +36,7 @@ package org.apache.kylin.streaming;
 
 import com.google.common.collect.Lists;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.DaemonThreadFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -87,7 +88,7 @@ public class ITKafkaConsumerTest extends KafkaBaseTest {
     @Ignore("since ci does not have the topic")
     public void test() throws InterruptedException {
         final TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig);
-        final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
+        final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size(), new DaemonThreadFactory());
         List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
         for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
             KafkaConsumer consumer = new KafkaConsumer(0, kafkaTopicMeta.getName(), partitionId, 0, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);