You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/17 06:30:28 UTC
[1/2] incubator-kylin git commit: KYLIN-834 optimize StreamingUtil
binary search perf
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 606d7deb2 -> b980228bf
KYLIN-834 optimize StreamingUtil binary search perf
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c698de90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c698de90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c698de90
Branch: refs/heads/0.8
Commit: c698de90d95763dc0539f918faaad31eb6810100
Parents: 606d7de
Author: honma <ho...@ebay.com>
Authored: Wed Jun 17 10:42:20 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 17 10:42:36 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/streaming/StreamingUtil.java | 43 +++++++++++++-------
1 file changed, 28 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c698de90/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 6086a55..160bdfe 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -1,6 +1,7 @@
package org.apache.kylin.streaming;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.javaapi.FetchResponse;
@@ -11,6 +12,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.Map;
/**
*/
@@ -53,18 +55,6 @@ public final class StreamingUtil {
throw new IllegalStateException(String.format("try to get timestamp of topic: %s, partitionId: %d, offset: %d, failed to get StreamMessage from kafka", topic, partitionId, offset));
}
- public static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamParser streamParser) {
- final String topic = kafkaClusterConfig.getTopic();
- logger.debug(String.format("try to get timestamp of topic: %s, partitionId: %d, offset: %d", topic, partitionId, offset));
- final MessageAndOffset messageAndOffset = getKafkaMessage(kafkaClusterConfig, partitionId, offset);
- final ByteBuffer payload = messageAndOffset.message().payload();
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- final ParsedStreamMessage parsedStreamMessage = streamParser.parse(new StreamMessage(messageAndOffset.offset(), bytes));
- logger.debug(String.format("get timestamp:%d", parsedStreamMessage.getTimestamp()));
- return parsedStreamMessage.getTimestamp();
- }
-
public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamParser streamParser) {
final String topic = kafkaClusterConfig.getTopic();
final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
@@ -77,11 +67,13 @@ public final class StreamingUtil {
}
private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamParser streamParser) {
+ Map<Long, Long> cache = Maps.newHashMap();
+
while (startOffset < endOffset) {
long midOffset = startOffset + ((endOffset - startOffset) >> 1);
- long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamParser);
- long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamParser);
- long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamParser);
+ long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamParser, cache);
+ long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamParser, cache);
+ long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamParser, cache);
// hard to ensure these 2 conditions
// Preconditions.checkArgument(startTimestamp <= midTimestamp);
// Preconditions.checkArgument(midTimestamp <= endTimestamp);
@@ -102,6 +94,27 @@ public final class StreamingUtil {
}
}
return startOffset;
+ }
+
+ private static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamParser streamParser, Map<Long, Long> cache) {
+ if (cache.containsKey(offset)) {
+ return cache.get(offset);
+ } else {
+ long t = getDataTimestamp(kafkaClusterConfig, partitionId, offset, streamParser);
+ cache.put(offset, t);
+ return t;
+ }
+ }
+
+ public static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamParser streamParser) {
+ final String topic = kafkaClusterConfig.getTopic();
+ final MessageAndOffset messageAndOffset = getKafkaMessage(kafkaClusterConfig, partitionId, offset);
+ final ByteBuffer payload = messageAndOffset.message().payload();
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ final ParsedStreamMessage parsedStreamMessage = streamParser.parse(new StreamMessage(messageAndOffset.offset(), bytes));
+ logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, parsedStreamMessage.getTimestamp()));
+ return parsedStreamMessage.getTimestamp();
}
}
[2/2] incubator-kylin git commit: Kafka consumer thread factory
changed to daemon
Posted by ma...@apache.org.
Kafka consumer thread factory changed to daemon
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b980228b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b980228b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b980228b
Branch: refs/heads/0.8
Commit: b980228bf74b0c5fe6ee1526406ba61ea930d752
Parents: c698de9
Author: honma <ho...@ebay.com>
Authored: Wed Jun 17 11:16:37 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jun 17 11:16:37 2015 +0800
----------------------------------------------------------------------
.../kylin/common/util/DaemonThreadFactory.java | 15 +++++++++++++++
.../kylin/job/streaming/StreamingBootstrap.java | 11 ++++-------
.../apache/kylin/streaming/ITKafkaConsumerTest.java | 3 ++-
3 files changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b980228b/common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java b/common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
new file mode 100644
index 0000000..bc4502c
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
@@ -0,0 +1,15 @@
+package org.apache.kylin.common.util;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ */
+public class DaemonThreadFactory implements ThreadFactory {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ return t;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b980228b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index ae6b282..6e2c02b 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -43,6 +43,7 @@ import kafka.cluster.Broker;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.DaemonThreadFactory;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.cube.CubeInstance;
@@ -160,7 +161,7 @@ public class StreamingBootstrap {
}
logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId + " transferredPartitionId:" + transferredPartitionId);
KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
- Executors.newSingleThreadExecutor().submit(consumer);
+ Executors.newSingleThreadExecutor(new DaemonThreadFactory()).submit(consumer);
result.add(consumer.getStreamQueue(0));
}
return result;
@@ -314,14 +315,10 @@ public class StreamingBootstrap {
final IIDesc iiDesc = iiSegment.getIIDesc();
- Executors.newSingleThreadExecutor().submit(consumer);
+ Executors.newSingleThreadExecutor(new DaemonThreadFactory()).submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
- final StreamBuilder task = StreamBuilder.newLimitedSizeStreamBuilder(streamingConfig.getName(),
- consumer.getStreamQueue(i % parallelism),
- new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i),
- 0L,
- iiDesc.getSliceSize());
+ final StreamBuilder task = StreamBuilder.newLimitedSizeStreamBuilder(streamingConfig.getName(), consumer.getStreamQueue(i % parallelism), new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L, iiDesc.getSliceSize());
task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
if (i == endShard - 1) {
streamingBuilderPool.submit(task).get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b980228b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
index 4ed7094..ca6fe87 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/ITKafkaConsumerTest.java
@@ -36,6 +36,7 @@ package org.apache.kylin.streaming;
import com.google.common.collect.Lists;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.DaemonThreadFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -87,7 +88,7 @@ public class ITKafkaConsumerTest extends KafkaBaseTest {
@Ignore("since ci does not have the topic")
public void test() throws InterruptedException {
final TopicMeta kafkaTopicMeta = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig);
- final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
+ final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size(), new DaemonThreadFactory());
List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
KafkaConsumer consumer = new KafkaConsumer(0, kafkaTopicMeta.getName(), partitionId, 0, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);