You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/09/11 10:14:12 UTC
incubator-kylin git commit: KYLIN-1023 kylin streaming log start end
offset for each partition for data verification
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging d6ffa6058 -> 6d22f3fe7
KYLIN-1023 kylin streaming log start end offset for each partition for data verification
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6d22f3fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6d22f3fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6d22f3fe
Branch: refs/heads/2.x-staging
Commit: 6d22f3fe77706a9fcc27ac6ab20c047e5e872de5
Parents: d6ffa60
Author: honma <ho...@ebay.com>
Authored: Fri Sep 11 16:16:11 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Sep 11 16:16:26 2015 +0800
----------------------------------------------------------------------
.../kylin/engine/streaming/StreamingCLI.java | 2 -
.../org/apache/kylin/job/tools/KafkaVerify.java | 101 +++++++++++++++++++
.../kylin/streaming/MicroStreamBatch.java | 4 +
.../kylin/streaming/OneOffStreamBuilder.java | 2 +-
.../apache/kylin/streaming/StreamFetcher.java | 19 ++--
.../apache/kylin/streaming/StreamingConfig.java | 2 +-
6 files changed, 119 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d22f3fe/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
index 277ee69..8bf52c1 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
@@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-/**
- */
public class StreamingCLI {
private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d22f3fe/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java b/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java
new file mode 100644
index 0000000..ee64e66
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/tools/KafkaVerify.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+
+/**
+ * only for verify kylin streaming's correctness by comparing to data in original kafka topic
+ */
+public class KafkaVerify {
+
+ public static void main(String[] args) throws IOException {
+
+ System.out.println("start");
+
+ ObjectMapper mapper = new ObjectMapper();
+ JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+
+ long start = Long.valueOf(args[0]);
+ long end = Long.valueOf(args[1]);
+ long interval = Long.valueOf(args[2]);
+ int bucket = (int) ((end - start + interval - 1) / interval);
+
+ long qtySum[] = new long[bucket];
+ long qtyTotal = 0;
+ long counts[] = new long[bucket];
+ long countTotal = 0;
+ long processed = 0;
+ long minOffset = -1;
+ long maxOffset = -1;
+
+ try (BufferedReader br = new BufferedReader(new FileReader(new File(args[3])))) {
+ String s;
+ while ((s = br.readLine()) != null) {
+ // process the line.
+ if (++processed % 10000 == 1) {
+ System.out.println("processing " + processed);
+ }
+
+ Map<String, String> root = mapper.readValue(s, mapType);
+ String tsStr = root.get("sys_ts");
+
+ if (StringUtils.isEmpty(tsStr)) {
+ continue;
+ }
+ long ts = Long.valueOf(tsStr);
+ if (ts < start || ts >= end) {
+ continue;
+ }
+
+ if (minOffset == -1) {
+ minOffset = processed - 1;
+ }
+ maxOffset = processed - 1;
+
+ long qty = Long.valueOf(root.get("qty"));
+ int index = (int) ((ts - start) / interval);
+ qtySum[index] += qty;
+ qtyTotal += qty;
+ counts[index]++;
+ countTotal++;
+ }
+ }
+
+ System.out.println("qty sum is " + Arrays.toString(qtySum));
+ System.out.println("qty total is " + qtyTotal);
+ System.out.println("count is " + Arrays.toString(counts));
+ System.out.println("count total is " + countTotal);
+ System.out.println("first processed is " + minOffset);
+ System.out.println("last processed is " + maxOffset);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d22f3fe/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
index f4d9e05..27f817e 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
@@ -62,6 +62,10 @@ public final class MicroStreamBatch {
return this.rawMessageCount;
}
+ public final int getFilteredMessageCount() {
+ return this.streams.size();
+ }
+
public final void add(ParsedStreamMessage parsedStreamMessage) {
if (offset.getFirst() > parsedStreamMessage.getOffset()) {
offset.setFirst(parsedStreamMessage.getOffset());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d22f3fe/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
index 927873a..ae0f70f 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
@@ -45,7 +45,7 @@ public class OneOffStreamBuilder implements Runnable {
final List<Future<MicroStreamBatch>> futures = Lists.newLinkedList();
int partitionId = 0;
for (BlockingQueue<StreamMessage> queue : queues) {
- futures.add(executorService.submit(new StreamFetcher(partitionId, queue, countDownLatch, batchCondition, streamParser)));
+ futures.add(executorService.submit(new StreamFetcher(partitionId++, queue, countDownLatch, batchCondition, streamParser)));
}
countDownLatch.await();
List<MicroStreamBatch> batches = Lists.newLinkedList();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d22f3fe/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
index 85d09be..f429a49 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
@@ -7,6 +7,8 @@ import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
*/
public class StreamFetcher implements Callable<MicroStreamBatch> {
@@ -26,9 +28,6 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
this.streamParser = streamParser;
}
- private void clearCounter() {
- }
-
private StreamMessage peek(BlockingQueue<StreamMessage> queue, long timeout) {
long t = System.currentTimeMillis();
while (true) {
@@ -57,7 +56,6 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
while (true) {
if (microStreamBatch == null) {
microStreamBatch = new MicroStreamBatch(partitionId);
- clearCounter();
}
StreamMessage streamMessage = peek(streamMessageQueue, 60000);
if (streamMessage == null) {
@@ -83,21 +81,28 @@ public class StreamFetcher implements Callable<MicroStreamBatch> {
} else if (result == BatchCondition.Result.LAST_ACCEPT_FOR_BATCH) {
streamMessageQueue.take();
microStreamBatch.add(parsedStreamMessage);
- return microStreamBatch;
+ break;
} else if (result == BatchCondition.Result.DISCARD) {
streamMessageQueue.take();
} else if (result == BatchCondition.Result.REJECT) {
- return microStreamBatch;
+ logger.info("Partition :" + partitionId + " rejecting message at " + parsedStreamMessage.getOffset());
+ break;
}
} else {
streamMessageQueue.take();
}
}
+
+ Preconditions.checkArgument(microStreamBatch != null, "microStreamBatch is null!");
+ logger.info(String.format("Partition %d contributing %d filtered messages out from %d raw messages"//
+ , partitionId, microStreamBatch.getFilteredMessageCount(), microStreamBatch.getRawMessageCount()));
+ return microStreamBatch;
+
} catch (Exception e) {
logger.error("build stream error, stop building", e);
throw new RuntimeException("build stream error, stop building", e);
} finally {
- logger.info("one partition sign off");
+ logger.info("partition {} sign off", partitionId);
countDownLatch.countDown();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6d22f3fe/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
index 320768b..c2d5361 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingConfig.java
@@ -84,7 +84,7 @@ public class StreamingConfig extends RootPersistentEntity {
@JsonProperty("parserName")
private String parserName;
- //"configA=1;configB=2"
+ //"tsColName=timestamp;x=y"
@JsonProperty("parserProperties")
private String parserProperties;