You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/07/01 10:43:03 UTC

[4/4] incubator-kylin git commit: KYLIN-864

KYLIN-864


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1657db8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1657db8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1657db8f

Branch: refs/heads/0.8
Commit: 1657db8f50c641496fabc8d4424ed9b62fd16d33
Parents: 3d83a25
Author: honma <ho...@ebay.com>
Authored: Tue Jun 30 16:30:49 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jul 1 16:42:29 2015 +0800

----------------------------------------------------------------------
 .../kylin/common/debug/BackdoorToggles.java     |  12 +--
 .../kylin/job/streaming/BootstrapConfig.java    |   1 +
 .../kylin/job/streaming/StreamingBootstrap.java |  55 ++++++++--
 .../kylin/job/streaming/StreamingCLI.java       |  43 ++++----
 .../observer/AggregateRegionObserver.java       |  13 ++-
 .../kylin/streaming/MarginCalculator.java       |  83 +++++++++++++++
 .../kylin/streaming/OffsetPeriodCondition.java  |  35 +++++++
 .../apache/kylin/streaming/PartitionMargin.java |  13 +++
 .../streaming/PartitionMarginCalculator.java    | 103 +++++++++++++++++++
 .../apache/kylin/streaming/StreamFetcher.java   |   2 +-
 .../apache/kylin/streaming/StreamingUtil.java   |  24 ++++-
 .../kylin/streaming/TimedJsonStreamParser.java  |   5 -
 12 files changed, 337 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index 5ce76aa..560a836 100644
--- a/common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++ b/common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -64,9 +64,9 @@ public class BackdoorToggles {
      *
      *
      *
-     example:
+     example:(put it into request body)
      "backdoorToggles": {
-     "DEBUG_TOGGLE_DISABLE_FUZZY_KEY": "true"
+        "DEBUG_TOGGLE_DISABLE_FUZZY_KEY": "true"
      }
      */
     public final static String DEBUG_TOGGLE_DISABLE_FUZZY_KEY = "DEBUG_TOGGLE_DISABLE_FUZZY_KEY";
@@ -74,9 +74,9 @@ public class BackdoorToggles {
     /**
      * set DEBUG_TOGGLE_OBSERVER_BEHAVIOR=SCAN/SCAN_FILTER/SCAN_FILTER_AGGR to control observer behavior for debug/profile usage
      *
-     example:
+     example:(put it into request body)
      "backdoorToggles": {
-     "DEBUG_TOGGLE_OBSERVER_BEHAVIOR": "SCAN"
+        "DEBUG_TOGGLE_OBSERVER_BEHAVIOR": "SCAN"
      }
      */
     public final static String DEBUG_TOGGLE_OBSERVER_BEHAVIOR = "DEBUG_TOGGLE_OBSERVER_BEHAVIOR";
@@ -84,9 +84,9 @@ public class BackdoorToggles {
     /**
      * set DEBUG_TOGGLE_LOCAL_COPROCESSOR=true to run coprocessor at client side (not in HBase region server)
      *
-     example:
+     example:(put it into request body)
      "backdoorToggles": {
-     "DEBUG_TOGGLE_LOCAL_COPROCESSOR": "true"
+        "DEBUG_TOGGLE_LOCAL_COPROCESSOR": "true"
      }
      */
     public final static String DEBUG_TOGGLE_LOCAL_COPROCESSOR = "DEBUG_TOGGLE_LOCAL_COPROCESSOR";

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
index a82fec3..302f455 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
@@ -12,6 +12,7 @@ public class BootstrapConfig {
     private long end = 0L;
     private long margin = 0L;
 
+
     public long getMargin() {
         return margin;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 98bbcd4..864062f 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -34,17 +34,24 @@
 
 package org.apache.kylin.job.streaming;
 
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import javax.annotation.Nullable;
+
+import jodd.io.StreamUtil;
 import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.DaemonThreadFactory;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -60,12 +67,10 @@ import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  */
@@ -272,6 +277,36 @@ public class StreamingBootstrap {
         logger.info("one off build finished");
     }
 
+    private void startCalculatingMargin(final StreamingConfig streamingConfig) throws Exception {
+        final String cubeName = streamingConfig.getCubeName();
+        final StreamParser streamParser = getStreamParser(streamingConfig, Lists.<TblColRef>newArrayList());
+        final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
+
+        int clusterId = 0;
+        final List<Pair<Long,Long>> firstAndLastOffsets = Lists.newArrayList();
+
+        for (final KafkaClusterConfig kafkaClusterConfig : streamingConfig.getKafkaClusterConfigs()) {
+            final ConcurrentMap<Integer, Long> partitionIdOffsetMap = Maps.newConcurrentMap();
+            final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+            for (int i = 0; i < partitionCount; ++i) {
+                Pair<Long,Long> firstlast = StreamingUtil.getFirstAndLastOffset(kafkaClusterConfig,i);
+                firstAndLastOffsets.add(firstlast);
+                partitionIdOffsetMap.putIfAbsent(i,firstlast.getFirst());
+            }
+
+            logger.info("partitionId to start offset map:" + partitionIdOffsetMap);
+            Preconditions.checkArgument(partitionIdOffsetMap.size() == partitionCount, "fail to get all start offset");
+            final List<BlockingQueue<StreamMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, 0);
+            queues.addAll(oneClusterQueue);
+            logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
+            clusterId++;
+        }
+
+        OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp, margin);
+        Executors.newSingleThreadExecutor().submit(oneOffStreamBuilder).get();
+        logger.info("one off build finished");
+    }
+
     private void startIIStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
 
         List<KafkaClusterConfig> allClustersConfigs = streamingConfig.getKafkaClusterConfigs();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index 409c558..1eb23f1 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -34,7 +34,6 @@
 
 package org.apache.kylin.job.streaming;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.cache.RemoteCacheUpdater;
@@ -42,6 +41,8 @@ import org.apache.kylin.common.restclient.AbstractRestCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  */
 public class StreamingCLI {
@@ -60,26 +61,26 @@ public class StreamingCLI {
             while (i < args.length) {
                 String argName = args[i];
                 switch (argName) {
-                    case "-oneoff":
-                        bootstrapConfig.setOneOff(Boolean.parseBoolean(args[++i]));
-                        break;
-                    case "-start":
-                        bootstrapConfig.setStart(Long.parseLong(args[++i]));
-                        break;
-                    case "-end":
-                        bootstrapConfig.setEnd(Long.parseLong(args[++i]));
-                        break;
-                    case "-streaming":
-                        bootstrapConfig.setStreaming(args[++i]);
-                        break;
-                    case "-partition":
-                        bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
-                        break;
-                    case "-margin":
-                        bootstrapConfig.setMargin(Long.parseLong(args[++i]));
-                        break;
-                    default:
-                        logger.warn("ignore this arg:" + argName);
+                case "-oneoff":
+                    bootstrapConfig.setOneOff(Boolean.parseBoolean(args[++i]));
+                    break;
+                case "-start":
+                    bootstrapConfig.setStart(Long.parseLong(args[++i]));
+                    break;
+                case "-end":
+                    bootstrapConfig.setEnd(Long.parseLong(args[++i]));
+                    break;
+                case "-streaming":
+                    bootstrapConfig.setStreaming(args[++i]);
+                    break;
+                case "-partition":
+                    bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
+                    break;
+                case "-margin":
+                    bootstrapConfig.setMargin(Long.parseLong(args[++i]));
+                    break;
+                default:
+                    logger.warn("ignore this arg:" + argName);
                 }
                 i++;
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index cf5b8d1..3492f20 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -85,9 +85,14 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
 
         ObserverBehavior observerBehavior = ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM;
-        byte[] behavior = scan.getAttribute(BEHAVIOR);
-        if (behavior != null && behavior.length != 0) {
-            observerBehavior = ObserverBehavior.valueOf(new String(behavior));
+        try {
+            byte[] behavior = scan.getAttribute(BEHAVIOR);
+            if (behavior != null && behavior.length != 0) {
+                observerBehavior = ObserverBehavior.valueOf(new String(behavior));
+            }
+        } catch (Exception e) {
+            LOG.error("failed to parse behavior,using default behavior SCAN_FILTER_AGGR_CHECKMEM", e);
+            observerBehavior = ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM;
         }
 
         // start/end region operation & sync on scanner is suggested by the
@@ -102,7 +107,5 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         } finally {
             region.closeRegionOperation();
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/MarginCalculator.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MarginCalculator.java b/streaming/src/main/java/org/apache/kylin/streaming/MarginCalculator.java
new file mode 100644
index 0000000..dc18cc4
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MarginCalculator.java
@@ -0,0 +1,83 @@
+package org.apache.kylin.streaming;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class MarginCalculator implements Runnable {
+
+    private static final Logger logger = LoggerFactory.getLogger(MarginCalculator.class);
+
+
+    private final String streaming;
+    private final List<BlockingQueue<StreamMessage>> queues;
+    private final List<Pair<Long,Long>> firstAndLastOffsets;
+    private StreamParser streamParser;
+
+    public MarginCalculator(String streaming, List<BlockingQueue<StreamMessage>> queues, StreamParser streamParser, List<Pair<Long,Long>> firstAndLastOffsets) {
+        Preconditions.checkArgument(queues.size() > 0);
+        this.streaming = streaming;
+        this.queues = queues;
+        this.streamParser = streamParser;
+        this.firstAndLastOffsets = firstAndLastOffsets;
+    }
+
+    @Override
+    public void run() {
+        try {
+            final int inputCount = queues.size();
+            final ExecutorService executorService = Executors.newFixedThreadPool(inputCount);
+            final CountDownLatch countDownLatch = new CountDownLatch(inputCount);
+            final List<Future<MicroStreamBatch>> futures = Lists.newLinkedList();
+            int partitionId = 0;
+            for (BlockingQueue<StreamMessage> queue : queues) {
+                futures.add(executorService.submit(new StreamFetcher(partitionId, queue, countDownLatch, batchCondition, streamParser)));
+            }
+            countDownLatch.await();
+            List<MicroStreamBatch> batches = Lists.newLinkedList();
+            for (Future<MicroStreamBatch> future : futures) {
+                if (future.get() != null) {
+                    batches.add(future.get());
+                } else {
+                    logger.warn("EOF encountered, stop streaming");
+                }
+            }
+
+            MicroStreamBatch batch = batches.get(0);
+            if (batches.size() > 1) {
+                for (int i = 1; i < inputCount; i++) {
+                    if (batches.get(i).size() > 0) {
+                        batch = MicroStreamBatch.union(batch, batches.get(i));
+                    }
+                }
+            }
+            batch.getTimestamp().setFirst(batchCondition.getStartTime());
+            batch.getTimestamp().setSecond(batchCondition.getEndTime());
+
+            logger.info("Consuming {} messages, covering from {} to {}", new String[] { String.valueOf(batch.size()), DateFormat.formatToTimeStr(batch.getTimestamp().getFirst()), DateFormat.formatToTimeStr(batch.getTimestamp().getSecond()) });
+            long startTime = System.currentTimeMillis();
+            consumer.consume(batch);
+            logger.info("Batch build costs {} milliseconds", System.currentTimeMillis() - startTime);
+        } catch (InterruptedException ie) {
+            throw new RuntimeException("this thread should not be interrupted", ie);
+        } catch (ExecutionException ee) {
+            logger.error("fetch stream error", ee);
+            throw new RuntimeException(ee);
+        } catch (Exception e) {
+            logger.error("error consume batch", e);
+            throw new RuntimeException("error consume batch", e);
+        }
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/OffsetPeriodCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/OffsetPeriodCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/OffsetPeriodCondition.java
new file mode 100644
index 0000000..0e4cc3f
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/OffsetPeriodCondition.java
@@ -0,0 +1,35 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public class OffsetPeriodCondition implements BatchCondition {
+
+    private final long startOffset;
+    private final long endOffset;
+
+    public OffsetPeriodCondition(long startOffset, long endOffset) {
+        this.startOffset = startOffset;
+        this.endOffset = endOffset;
+    }
+
+    public long getStartOffset() {
+        return startOffset;
+    }
+
+    public long getEndOffset() {
+        return endOffset;
+    }
+
+    @Override
+    public Result apply(ParsedStreamMessage message) {
+        final long offset = message.getOffset();
+        if (offset < startOffset) {
+            return Result.DISCARD;
+        } else if (offset < endOffset) {
+            return Result.ACCEPT;
+        } else {
+            return Result.REJECT;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/PartitionMargin.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/PartitionMargin.java b/streaming/src/main/java/org/apache/kylin/streaming/PartitionMargin.java
new file mode 100644
index 0000000..8f68a0a
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/PartitionMargin.java
@@ -0,0 +1,13 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public class PartitionMargin {
+    public long leftMargin;
+    public long rightMargin;
+
+    public PartitionMargin(long leftMargin, long rightMargin) {
+        this.leftMargin = leftMargin;
+        this.rightMargin = rightMargin;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/PartitionMarginCalculator.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/PartitionMarginCalculator.java b/streaming/src/main/java/org/apache/kylin/streaming/PartitionMarginCalculator.java
new file mode 100644
index 0000000..a670ee4
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/PartitionMarginCalculator.java
@@ -0,0 +1,103 @@
+package org.apache.kylin.streaming;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * get the margin for a single partition
+ */
+public class PartitionMarginCalculator implements Callable<PartitionMargin> {
+
+    /**
+     * use the WINDOW_SIZE messages prior to a message to represent its show time
+     */
+    private class MovingAverage {
+
+        private static final int WINDOW_SIZE = 20;
+
+        private Queue<Long> q = Lists.newLinkedList();
+        private long totalSum = 0;
+
+        public long addNewElementAndGetAvg(long e) {
+            if (q.size() < WINDOW_SIZE) {
+                q.add(e);
+                totalSum += e;
+                return totalSum / q.size();
+            }
+
+            long head = q.remove();
+            q.add(e);
+            return (totalSum - head + e) / WINDOW_SIZE;
+        }
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(PartitionMarginCalculator.class);
+
+    private final BlockingQueue<StreamMessage> streamMessageQueue;
+    private final CountDownLatch countDownLatch;
+    private final int partitionId;
+    private final long lastOffset;
+    private final StreamParser streamParser;
+
+    //runtime calculations
+    private final HashMap<Long, Long> earliestOffsets = Maps.newHashMap();//for each second in context, record its earliest show offset in kafka
+    private long maxMargin = 0;
+    private final MovingAverage average = new MovingAverage();
+
+    public PartitionMarginCalculator(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long lastOffset, StreamParser streamParser) {
+        this.partitionId = partitionId;
+        this.streamMessageQueue = streamMessageQueue;
+        this.countDownLatch = countDownLatch;
+        this.streamParser = streamParser;
+        this.lastOffset = lastOffset;
+    }
+
+    @Override
+    public PartitionMargin call() throws Exception {
+        try {
+            while (true) {
+                StreamMessage streamMessage = streamMessageQueue.poll(30, TimeUnit.SECONDS);
+                if (streamMessage == null) {
+                    logger.info("The stream queue for partition {} is drained", partitionId);
+                    continue;
+                }
+
+                final ParsedStreamMessage parsedStreamMessage = streamParser.parse(streamMessage);
+                if (parsedStreamMessage == null) {
+                    throw new RuntimeException("parsedStreamMessage of " + new String(streamMessage.getRawData()) + " is null");
+                }
+
+                if (parsedStreamMessage.getOffset() >= lastOffset) {
+                    logger.info("The final max margin for partition {} is {} ", partitionId, maxMargin);
+                    return new PartitionMargin(maxMargin, maxMargin);
+                }
+
+                long timestamp = parsedStreamMessage.getTimestamp();
+                long wallTime = average.addNewElementAndGetAvg(timestamp);
+                long formalizedTs = timestamp / 1000 * 1000;
+
+                if (earliestOffsets.containsKey(formalizedTs)) {
+                    this.maxMargin = Math.max(this.maxMargin, Math.abs(earliestOffsets.get(formalizedTs) - wallTime));
+                } else {
+                    earliestOffsets.put(formalizedTs, wallTime);
+                }
+
+            }
+        } catch (Exception e) {
+            logger.error("partition margin calculation stream error, stopping", e);
+            throw new RuntimeException("partition margin calculation stream error, stopping", e);
+        } finally {
+            logger.info("one partition sign off");
+            countDownLatch.countDown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
index afe6d79..38af7a1 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
@@ -10,7 +10,7 @@ import java.util.concurrent.CountDownLatch;
 /**
  */
 public class StreamFetcher implements Callable<MicroStreamBatch> {
-    private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
+    private static final Logger logger = LoggerFactory.getLogger(StreamFetcher.class);
 
     private final BlockingQueue<StreamMessage> streamMessageQueue;
     private final CountDownLatch countDownLatch;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 160bdfe..264c384 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -7,6 +7,7 @@ import kafka.cluster.Broker;
 import kafka.javaapi.FetchResponse;
 import kafka.javaapi.PartitionMetadata;
 import kafka.message.MessageAndOffset;
+import org.apache.kylin.common.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,14 +57,21 @@ public final class StreamingUtil {
     }
 
     public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamParser streamParser) {
+        Pair<Long,Long> firstAndLast = getFirstAndLastOffset(kafkaClusterConfig, partitionId);
+        final String topic = kafkaClusterConfig.getTopic();
+
+        logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, firstAndLast.getFirst(), firstAndLast.getSecond()));
+        final long result = binarySearch(kafkaClusterConfig, partitionId, firstAndLast.getFirst(), firstAndLast.getSecond(), timestamp, streamParser);
+        logger.info(String.format("topic: %s, partitionId: %d, found offset: %d", topic, partitionId, result));
+        return result;
+    }
+
+    public static Pair<Long, Long> getFirstAndLastOffset(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
         final String topic = kafkaClusterConfig.getTopic();
         final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
         final long earliestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
         final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig) - 1;
-        logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, earliestOffset, latestOffset));
-        final long result = binarySearch(kafkaClusterConfig, partitionId, earliestOffset, latestOffset, timestamp, streamParser);
-        logger.info(String.format("topic: %s, partitionId: %d, found offset: %d", topic, partitionId, result));
-        return result;
+        return Pair.newPair(earliestOffset, latestOffset);
     }
 
     private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamParser streamParser) {
@@ -117,4 +125,12 @@ public final class StreamingUtil {
         return parsedStreamMessage.getTimestamp();
 
     }
+
+    public static void main(String[] args) {
+        if (args == null || args.length == 0) {
+        }
+
+        if ("calculatemargin".equals(args[0])) {
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1657db8f/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
index cba52e8..908aa85 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimedJsonStreamParser.java
@@ -65,11 +65,6 @@ public final class TimedJsonStreamParser implements StreamParser {
     private final ObjectMapper mapper = new ObjectMapper();
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
 
-    public TimedJsonStreamParser(List<TblColRef> allColumns) {
-        this.allColumns = allColumns;
-        this.formatTs = false;
-    }
-
     public TimedJsonStreamParser(List<TblColRef> allColumns, boolean formatTs) {
         this.allColumns = allColumns;
         this.formatTs = formatTs;