You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/09/11 10:14:12 UTC

incubator-kylin git commit: KYLIN-1023 kylin streaming log start end offset for each partition for data verification

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging d6ffa6058 -> 6d22f3fe7


KYLIN-1023 kylin streaming log start end offset for each partition for data verification


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6d22f3fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6d22f3fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6d22f3fe

Branch: refs/heads/2.x-staging
Commit: 6d22f3fe77706a9fcc27ac6ab20c047e5e872de5
Parents: d6ffa60
Author: honma <ho...@ebay.com>
Authored: Fri Sep 11 16:16:11 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Sep 11 16:16:26 2015 +0800

----------------------------------------------------------------------
 .../kylin/engine/streaming/StreamingCLI.java    |   2 -
 .../org/apache/kylin/job/tools/KafkaVerify.java | 101 +++++++++++++++++++
 .../kylin/streaming/MicroStreamBatch.java       |   4 +
 .../kylin/streaming/OneOffStreamBuilder.java    |   2 +-
 .../apache/kylin/streaming/StreamFetcher.java   |  19 ++--
 .../apache/kylin/streaming/StreamingConfig.java |   2 +-
 6 files changed, 119 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d22f3fe/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
index 277ee69..8bf52c1 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
@@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
-/**
- */
 public class StreamingCLI {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d22f3fe/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java b/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java
new file mode 100644
index 0000000..ee64e66
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+
+/**
+ * only for verify kylin streaming's correctness by comparing to data in original kafka topic
+ */
+public class KafkaVerify {
+
+    public static void main(String[] args) throws IOException {
+
+        System.out.println("start");
+        
+        ObjectMapper mapper = new ObjectMapper();
+        JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+
+        long start = Long.valueOf(args[0]);
+        long end = Long.valueOf(args[1]);
+        long interval = Long.valueOf(args[2]);
+        int bucket = (int) ((end - start + interval - 1) / interval);
+        
+        long qtySum[] = new long[bucket];
+        long qtyTotal = 0;
+        long counts[] = new long[bucket];
+        long countTotal = 0;
+        long processed = 0;
+        long minOffset = -1;
+        long maxOffset = -1;
+
+        try (BufferedReader br = new BufferedReader(new FileReader(new File(args[3])))) {
+            String s;
+            while ((s = br.readLine()) != null) {
+                // process the line.
+                if (++processed % 10000 == 1) {
+                    System.out.println("processing " + processed);
+                }
+
+                Map<String, String> root = mapper.readValue(s, mapType);
+                String tsStr = root.get("sys_ts");
+
+                if (StringUtils.isEmpty(tsStr)) {
+                    continue;
+                }
+                long ts = Long.valueOf(tsStr);
+                if (ts < start || ts >= end) {
+                    continue;
+                }
+
+                if (minOffset == -1) {
+                    minOffset = processed - 1;
+                }
+                maxOffset = processed - 1;
+
+                long qty = Long.valueOf(root.get("qty"));
+                int index = (int) ((ts - start) / interval);
+                qtySum[index] += qty;
+                qtyTotal += qty;
+                counts[index]++;
+                countTotal++;
+            }
+        }
+
+        System.out.println("qty sum is " + Arrays.toString(qtySum));
+        System.out.println("qty total is " + qtyTotal);
+        System.out.println("count is " + Arrays.toString(counts));
+        System.out.println("count total is " + countTotal);
+        System.out.println("first processed is " + minOffset);
+        System.out.println("last processed is " + maxOffset);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d22f3fe/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
index f4d9e05..27f817e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
@@ -62,6 +62,10 @@ public final class MicroStreamBatch {
         return this.rawMessageCount;
     }
 
+    public final int getFilteredMessageCount() {
+        return this.streams.size();
+    }
+
     public final void add(ParsedStreamMessage parsedStreamMessage) {
         if (offset.getFirst() > parsedStreamMessage.getOffset()) {
             offset.setFirst(parsedStreamMessage.getOffset());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d22f3fe/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
index 927873a..ae0f70f 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
@@ -45,7 +45,7 @@ public class OneOffStreamBuilder implements Runnable {
             final List<Future<MicroStreamBatch>> futures = Lists.newLinkedList();
             int partitionId = 0;
             for (BlockingQueue<StreamMessage> queue : queues) {
-                futures.add(executorService.submit(new StreamFetcher(partitionId, queue, countDownLatch, batchCondition, streamParser)));
+                futures.add(executorService.submit(new StreamFetcher(partitionId++, queue, countDownLatch, batchCondition, streamParser)));
             }
             countDownLatch.await();
             List<MicroStreamBatch> batches = Lists.newLinkedList();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d22f3fe/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
index 85d09be..f429a49 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
@@ -7,6 +7,8 @@ import java.util.concurrent.CountDownLatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  */
 public class StreamFetcher implements Callable<MicroStreamBatch> {
@@ -26,9 +28,6 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
         this.streamParser = streamParser;
     }
 
-    private void clearCounter() {
-    }
-
     private StreamMessage peek(BlockingQueue<StreamMessage> queue, long timeout) {
         long t = System.currentTimeMillis();
         while (true) {
@@ -57,7 +56,6 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
             while (true) {
                 if (microStreamBatch == null) {
                     microStreamBatch = new MicroStreamBatch(partitionId);
-                    clearCounter();
                 }
                 StreamMessage streamMessage = peek(streamMessageQueue, 60000);
                 if (streamMessage == null) {
@@ -83,21 +81,28 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
                     } else if (result == BatchCondition.Result.LAST_ACCEPT_FOR_BATCH) {
                         streamMessageQueue.take();
                         microStreamBatch.add(parsedStreamMessage);
-                        return microStreamBatch;
+                        break;
                     } else if (result == BatchCondition.Result.DISCARD) {
                         streamMessageQueue.take();
                     } else if (result == BatchCondition.Result.REJECT) {
-                        return microStreamBatch;
+                        logger.info("Partition :" + partitionId + " rejecting message at " + parsedStreamMessage.getOffset());
+                        break;
                     }
                 } else {
                     streamMessageQueue.take();
                 }
             }
+
+            Preconditions.checkArgument(microStreamBatch != null, "microStreamBatch is null!");
+            logger.info(String.format("Partition %d contributing %d filtered messages out from %d raw messages"//
+                    , partitionId, microStreamBatch.getFilteredMessageCount(), microStreamBatch.getRawMessageCount()));
+            return microStreamBatch;
+
         } catch (Exception e) {
             logger.error("build stream error, stop building", e);
             throw new RuntimeException("build stream error, stop building", e);
         } finally {
-            logger.info("one partition sign off");
+            logger.info("partition {} sign off", partitionId);
             countDownLatch.countDown();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d22f3fe/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
index 320768b..c2d5361 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
@@ -84,7 +84,7 @@ public class StreamingConfig extends RootPersistentEntity {
     @JsonProperty("parserName")
     private String parserName;
     
-    //"configA=1;configB=2"
+    //"tsColName=timestamp;x=y"
     @JsonProperty("parserProperties")
     private String parserProperties;