You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/07/01 10:43:03 UTC
[4/4] incubator-kylin git commit: KYLIN-864
KYLIN-864
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1657db8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1657db8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1657db8f
Branch: refs/heads/0.8
Commit: 1657db8f50c641496fabc8d4424ed9b62fd16d33
Parents: 3d83a25
Author: honma <ho...@ebay.com>
Authored: Tue Jun 30 16:30:49 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jul 1 16:42:29 2015 +0800
----------------------------------------------------------------------
.../kylin/common/debug/BackdoorToggles.java | 12 +--
.../kylin/job/streaming/BootstrapConfig.java | 1 +
.../kylin/job/streaming/StreamingBootstrap.java | 55 ++++++++--
.../kylin/job/streaming/StreamingCLI.java | 43 ++++----
.../observer/AggregateRegionObserver.java | 13 ++-
.../kylin/streaming/MarginCalculator.java | 83 +++++++++++++++
.../kylin/streaming/OffsetPeriodCondition.java | 35 +++++++
.../apache/kylin/streaming/PartitionMargin.java | 13 +++
.../streaming/PartitionMarginCalculator.java | 103 +++++++++++++++++++
.../apache/kylin/streaming/StreamFetcher.java | 2 +-
.../apache/kylin/streaming/StreamingUtil.java | 24 ++++-
.../kylin/streaming/TimedJsonStreamParser.java | 5 -
12 files changed, 337 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index 5ce76aa..560a836 100644
--- a/common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++ b/common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -64,9 +64,9 @@ public class BackdoorToggles {
*
*
*
- example:
+ example:(put it into request body)
"backdoorToggles": {
- "DEBUG_TOGGLE_DISABLE_FUZZY_KEY": "true"
+ "DEBUG_TOGGLE_DISABLE_FUZZY_KEY": "true"
}
*/
public final static String DEBUG_TOGGLE_DISABLE_FUZZY_KEY = "DEBUG_TOGGLE_DISABLE_FUZZY_KEY";
@@ -74,9 +74,9 @@ public class BackdoorToggles {
/**
* set DEBUG_TOGGLE_OBSERVER_BEHAVIOR=SCAN/SCAN_FILTER/SCAN_FILTER_AGGR to control observer behavior for debug/profile usage
*
- example:
+ example:(put it into request body)
"backdoorToggles": {
- "DEBUG_TOGGLE_OBSERVER_BEHAVIOR": "SCAN"
+ "DEBUG_TOGGLE_OBSERVER_BEHAVIOR": "SCAN"
}
*/
public final static String DEBUG_TOGGLE_OBSERVER_BEHAVIOR = "DEBUG_TOGGLE_OBSERVER_BEHAVIOR";
@@ -84,9 +84,9 @@ public class BackdoorToggles {
/**
* set DEBUG_TOGGLE_LOCAL_COPROCESSOR=true to run coprocessor at client side (not in HBase region server)
*
- example:
+ example:(put it into request body)
"backdoorToggles": {
- "DEBUG_TOGGLE_LOCAL_COPROCESSOR": "true"
+ "DEBUG_TOGGLE_LOCAL_COPROCESSOR": "true"
}
*/
public final static String DEBUG_TOGGLE_LOCAL_COPROCESSOR = "DEBUG_TOGGLE_LOCAL_COPROCESSOR";
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
index a82fec3..302f455 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
@@ -12,6 +12,7 @@ public class BootstrapConfig {
private long end = 0L;
private long margin = 0L;
+
public long getMargin() {
return margin;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 98bbcd4..864062f 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -34,17 +34,24 @@
package org.apache.kylin.job.streaming;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import javax.annotation.Nullable;
+
+import jodd.io.StreamUtil;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.DaemonThreadFactory;
import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -60,12 +67,10 @@ import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
*/
@@ -272,6 +277,36 @@ public class StreamingBootstrap {
logger.info("one off build finished");
}
+ private void startCalculatingMargin(final StreamingConfig streamingConfig) throws Exception {
+ final String cubeName = streamingConfig.getCubeName();
+ final StreamParser streamParser = getStreamParser(streamingConfig, Lists.<TblColRef>newArrayList());
+ final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
+
+ int clusterId = 0;
+ final List<Pair<Long,Long>> firstAndLastOffsets = Lists.newArrayList();
+
+ for (final KafkaClusterConfig kafkaClusterConfig : streamingConfig.getKafkaClusterConfigs()) {
+ final ConcurrentMap<Integer, Long> partitionIdOffsetMap = Maps.newConcurrentMap();
+ final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+ for (int i = 0; i < partitionCount; ++i) {
+ Pair<Long,Long> firstlast = StreamingUtil.getFirstAndLastOffset(kafkaClusterConfig,i);
+ firstAndLastOffsets.add(firstlast);
+ partitionIdOffsetMap.putIfAbsent(i,firstlast.getFirst());
+ }
+
+ logger.info("partitionId to start offset map:" + partitionIdOffsetMap);
+ Preconditions.checkArgument(partitionIdOffsetMap.size() == partitionCount, "fail to get all start offset");
+ final List<BlockingQueue<StreamMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, 0);
+ queues.addAll(oneClusterQueue);
+ logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
+ clusterId++;
+ }
+
+ OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp, margin);
+ Executors.newSingleThreadExecutor().submit(oneOffStreamBuilder).get();
+ logger.info("one off build finished");
+ }
+
private void startIIStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
List<KafkaClusterConfig> allClustersConfigs = streamingConfig.getKafkaClusterConfigs();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index 409c558..1eb23f1 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -34,7 +34,6 @@
package org.apache.kylin.job.streaming;
-import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.cache.RemoteCacheUpdater;
@@ -42,6 +41,8 @@ import org.apache.kylin.common.restclient.AbstractRestCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
*/
public class StreamingCLI {
@@ -60,26 +61,26 @@ public class StreamingCLI {
while (i < args.length) {
String argName = args[i];
switch (argName) {
- case "-oneoff":
- bootstrapConfig.setOneOff(Boolean.parseBoolean(args[++i]));
- break;
- case "-start":
- bootstrapConfig.setStart(Long.parseLong(args[++i]));
- break;
- case "-end":
- bootstrapConfig.setEnd(Long.parseLong(args[++i]));
- break;
- case "-streaming":
- bootstrapConfig.setStreaming(args[++i]);
- break;
- case "-partition":
- bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
- break;
- case "-margin":
- bootstrapConfig.setMargin(Long.parseLong(args[++i]));
- break;
- default:
- logger.warn("ignore this arg:" + argName);
+ case "-oneoff":
+ bootstrapConfig.setOneOff(Boolean.parseBoolean(args[++i]));
+ break;
+ case "-start":
+ bootstrapConfig.setStart(Long.parseLong(args[++i]));
+ break;
+ case "-end":
+ bootstrapConfig.setEnd(Long.parseLong(args[++i]));
+ break;
+ case "-streaming":
+ bootstrapConfig.setStreaming(args[++i]);
+ break;
+ case "-partition":
+ bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
+ break;
+ case "-margin":
+ bootstrapConfig.setMargin(Long.parseLong(args[++i]));
+ break;
+ default:
+ logger.warn("ignore this arg:" + argName);
}
i++;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index cf5b8d1..3492f20 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -85,9 +85,14 @@ public class AggregateRegionObserver extends BaseRegionObserver {
CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
ObserverBehavior observerBehavior = ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM;
- byte[] behavior = scan.getAttribute(BEHAVIOR);
- if (behavior != null && behavior.length != 0) {
- observerBehavior = ObserverBehavior.valueOf(new String(behavior));
+ try {
+ byte[] behavior = scan.getAttribute(BEHAVIOR);
+ if (behavior != null && behavior.length != 0) {
+ observerBehavior = ObserverBehavior.valueOf(new String(behavior));
+ }
+ } catch (Exception e) {
+ LOG.error("failed to parse behavior,using default behavior SCAN_FILTER_AGGR_CHECKMEM", e);
+ observerBehavior = ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM;
}
// start/end region operation & sync on scanner is suggested by the
@@ -102,7 +107,5 @@ public class AggregateRegionObserver extends BaseRegionObserver {
} finally {
region.closeRegionOperation();
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/MarginCalculator.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MarginCalculator.java b/streaming/src/main/java/org/apache/kylin/streaming/MarginCalculator.java
new file mode 100644
index 0000000..dc18cc4
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MarginCalculator.java
@@ -0,0 +1,83 @@
+package org.apache.kylin.streaming;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class MarginCalculator implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(MarginCalculator.class);
+
+
+ private final String streaming;
+ private final List<BlockingQueue<StreamMessage>> queues;
+ private final List<Pair<Long,Long>> firstAndLastOffsets;
+ private StreamParser streamParser;
+
+ public MarginCalculator(String streaming, List<BlockingQueue<StreamMessage>> queues, StreamParser streamParser, List<Pair<Long,Long>> firstAndLastOffsets) {
+ Preconditions.checkArgument(queues.size() > 0);
+ this.streaming = streaming;
+ this.queues = queues;
+ this.streamParser = streamParser;
+ this.firstAndLastOffsets = firstAndLastOffsets;
+ }
+
+ @Override
+ public void run() {
+ try {
+ final int inputCount = queues.size();
+ final ExecutorService executorService = Executors.newFixedThreadPool(inputCount);
+ final CountDownLatch countDownLatch = new CountDownLatch(inputCount);
+ final List<Future<MicroStreamBatch>> futures = Lists.newLinkedList();
+ int partitionId = 0;
+ for (BlockingQueue<StreamMessage> queue : queues) {
+ futures.add(executorService.submit(new StreamFetcher(partitionId, queue, countDownLatch, batchCondition, streamParser)));
+ }
+ countDownLatch.await();
+ List<MicroStreamBatch> batches = Lists.newLinkedList();
+ for (Future<MicroStreamBatch> future : futures) {
+ if (future.get() != null) {
+ batches.add(future.get());
+ } else {
+ logger.warn("EOF encountered, stop streaming");
+ }
+ }
+
+ MicroStreamBatch batch = batches.get(0);
+ if (batches.size() > 1) {
+ for (int i = 1; i < inputCount; i++) {
+ if (batches.get(i).size() > 0) {
+ batch = MicroStreamBatch.union(batch, batches.get(i));
+ }
+ }
+ }
+ batch.getTimestamp().setFirst(batchCondition.getStartTime());
+ batch.getTimestamp().setSecond(batchCondition.getEndTime());
+
+ logger.info("Consuming {} messages, covering from {} to {}", new String[] { String.valueOf(batch.size()), DateFormat.formatToTimeStr(batch.getTimestamp().getFirst()), DateFormat.formatToTimeStr(batch.getTimestamp().getSecond()) });
+ long startTime = System.currentTimeMillis();
+ consumer.consume(batch);
+ logger.info("Batch build costs {} milliseconds", System.currentTimeMillis() - startTime);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("this thread should not be interrupted", ie);
+ } catch (ExecutionException ee) {
+ logger.error("fetch stream error", ee);
+ throw new RuntimeException(ee);
+ } catch (Exception e) {
+ logger.error("error consume batch", e);
+ throw new RuntimeException("error consume batch", e);
+ }
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/OffsetPeriodCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/OffsetPeriodCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/OffsetPeriodCondition.java
new file mode 100644
index 0000000..0e4cc3f
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/OffsetPeriodCondition.java
@@ -0,0 +1,35 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public class OffsetPeriodCondition implements BatchCondition {
+
+ private final long startOffset;
+ private final long endOffset;
+
+ public OffsetPeriodCondition(long startOffset, long endOffset) {
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getEndOffset() {
+ return endOffset;
+ }
+
+ @Override
+ public Result apply(ParsedStreamMessage message) {
+ final long offset = message.getOffset();
+ if (offset < startOffset) {
+ return Result.DISCARD;
+ } else if (offset < endOffset) {
+ return Result.ACCEPT;
+ } else {
+ return Result.REJECT;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/PartitionMargin.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/PartitionMargin.java b/streaming/src/main/java/org/apache/kylin/streaming/PartitionMargin.java
new file mode 100644
index 0000000..8f68a0a
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/PartitionMargin.java
@@ -0,0 +1,13 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public class PartitionMargin {
+ public long leftMargin;
+ public long rightMargin;
+
+ public PartitionMargin(long leftMargin, long rightMargin) {
+ this.leftMargin = leftMargin;
+ this.rightMargin = rightMargin;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/PartitionMarginCalculator.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/PartitionMarginCalculator.java b/streaming/src/main/java/org/apache/kylin/streaming/PartitionMarginCalculator.java
new file mode 100644
index 0000000..a670ee4
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/PartitionMarginCalculator.java
@@ -0,0 +1,103 @@
+package org.apache.kylin.streaming;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * get the margin for a single partition
+ */
+public class PartitionMarginCalculator implements Callable<PartitionMargin> {
+
+ /**
+ * use the WINDOW_SIZE messages prior to a message to represent its show time
+ */
+ private class MovingAverage {
+
+ private static final int WINDOW_SIZE = 20;
+
+ private Queue<Long> q = Lists.newLinkedList();
+ private long totalSum = 0;
+
+ public long addNewElementAndGetAvg(long e) {
+ if (q.size() < WINDOW_SIZE) {
+ q.add(e);
+ totalSum += e;
+ return totalSum / q.size();
+ }
+
+ long head = q.remove();
+ q.add(e);
+ return (totalSum - head + e) / WINDOW_SIZE;
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(PartitionMarginCalculator.class);
+
+ private final BlockingQueue<StreamMessage> streamMessageQueue;
+ private final CountDownLatch countDownLatch;
+ private final int partitionId;
+ private final long lastOffset;
+ private final StreamParser streamParser;
+
+ //runtime calculations
+ private final HashMap<Long, Long> earliestOffsets = Maps.newHashMap();//for each second in context, record its earliest show offset in kafka
+ private long maxMargin = 0;
+ private final MovingAverage average = new MovingAverage();
+
+ public PartitionMarginCalculator(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long lastOffset, StreamParser streamParser) {
+ this.partitionId = partitionId;
+ this.streamMessageQueue = streamMessageQueue;
+ this.countDownLatch = countDownLatch;
+ this.streamParser = streamParser;
+ this.lastOffset = lastOffset;
+ }
+
+ @Override
+ public PartitionMargin call() throws Exception {
+ try {
+ while (true) {
+ StreamMessage streamMessage = streamMessageQueue.poll(30, TimeUnit.SECONDS);
+ if (streamMessage == null) {
+ logger.info("The stream queue for partition {} is drained", partitionId);
+ continue;
+ }
+
+ final ParsedStreamMessage parsedStreamMessage = streamParser.parse(streamMessage);
+ if (parsedStreamMessage == null) {
+ throw new RuntimeException("parsedStreamMessage of " + new String(streamMessage.getRawData()) + " is null");
+ }
+
+ if (parsedStreamMessage.getOffset() >= lastOffset) {
+ logger.info("The final max margin for partition {} is {} ", partitionId, maxMargin);
+ return new PartitionMargin(maxMargin, maxMargin);
+ }
+
+ long timestamp = parsedStreamMessage.getTimestamp();
+ long wallTime = average.addNewElementAndGetAvg(timestamp);
+ long formalizedTs = timestamp / 1000 * 1000;
+
+ if (earliestOffsets.containsKey(formalizedTs)) {
+ this.maxMargin = Math.max(this.maxMargin, Math.abs(earliestOffsets.get(formalizedTs) - wallTime));
+ } else {
+ earliestOffsets.put(formalizedTs, wallTime);
+ }
+
+ }
+ } catch (Exception e) {
+ logger.error("partition margin calculation stream error, stopping", e);
+ throw new RuntimeException("partition margin calculation stream error, stopping", e);
+ } finally {
+ logger.info("one partition sign off");
+ countDownLatch.countDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
index afe6d79..38af7a1 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
@@ -10,7 +10,7 @@ import java.util.concurrent.CountDownLatch;
/**
*/
public class StreamFetcher implements Callable<MicroStreamBatch> {
- private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
+ private static final Logger logger = LoggerFactory.getLogger(StreamFetcher.class);
private final BlockingQueue<StreamMessage> streamMessageQueue;
private final CountDownLatch countDownLatch;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 160bdfe..264c384 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -7,6 +7,7 @@ import kafka.cluster.Broker;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.message.MessageAndOffset;
+import org.apache.kylin.common.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,14 +57,21 @@ public final class StreamingUtil {
}
public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamParser streamParser) {
+ Pair<Long,Long> firstAndLast = getFirstAndLastOffset(kafkaClusterConfig, partitionId);
+ final String topic = kafkaClusterConfig.getTopic();
+
+ logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, firstAndLast.getFirst(), firstAndLast.getSecond()));
+ final long result = binarySearch(kafkaClusterConfig, partitionId, firstAndLast.getFirst(), firstAndLast.getSecond(), timestamp, streamParser);
+ logger.info(String.format("topic: %s, partitionId: %d, found offset: %d", topic, partitionId, result));
+ return result;
+ }
+
+ public static Pair<Long, Long> getFirstAndLastOffset(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
final String topic = kafkaClusterConfig.getTopic();
final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
final long earliestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig) - 1;
- logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, earliestOffset, latestOffset));
- final long result = binarySearch(kafkaClusterConfig, partitionId, earliestOffset, latestOffset, timestamp, streamParser);
- logger.info(String.format("topic: %s, partitionId: %d, found offset: %d", topic, partitionId, result));
- return result;
+ return Pair.newPair(earliestOffset, latestOffset);
}
private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamParser streamParser) {
@@ -117,4 +125,12 @@ public final class StreamingUtil {
return parsedStreamMessage.getTimestamp();
}
+
+ public static void main(String[] args) {
+ if (args == null || args.length == 0) {
+ }
+
+ if ("calculatemargin".equals(args[0])) {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
index cba52e8..908aa85 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
@@ -65,11 +65,6 @@ public final class TimedJsonStreamParser implements StreamParser {
private final ObjectMapper mapper = new ObjectMapper();
private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
- public TimedJsonStreamParser(List<TblColRef> allColumns) {
- this.allColumns = allColumns;
- this.formatTs = false;
- }
-
public TimedJsonStreamParser(List<TblColRef> allColumns, boolean formatTs) {
this.allColumns = allColumns;
this.formatTs = formatTs;