You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/09/09 04:48:59 UTC
[2/3] incubator-kylin git commit: KYLIN-924 add an analyzer to check
kafka data quality
KYLIN-924 add an analyzer to check kafka data quality
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/51196d84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/51196d84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/51196d84
Branch: refs/heads/2.x-staging
Commit: 51196d8454215a62598aa019a598c6fe3c2ae7d1
Parents: 02178b1
Author: honma <ho...@ebay.com>
Authored: Tue Sep 8 22:07:07 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Sep 9 10:50:52 2015 +0800
----------------------------------------------------------------------
.../kylin/job/tools/StreamingInputAnalyzer.java | 242 +++++++++++++++++++
.../apache/kylin/job/tools/TimeHistogram.java | 85 +++++++
2 files changed, 327 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/51196d84/job/src/main/java/org/apache/kylin/job/tools/StreamingInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/StreamingInputAnalyzer.java b/job/src/main/java/org/apache/kylin/job/tools/StreamingInputAnalyzer.java
new file mode 100644
index 0000000..caf582c
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/tools/StreamingInputAnalyzer.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.tools;
+
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import kafka.api.OffsetRequest;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.DaemonThreadFactory;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.streaming.KafkaClusterConfig;
+import org.apache.kylin.streaming.KafkaConsumer;
+import org.apache.kylin.streaming.KafkaRequester;
+import org.apache.kylin.streaming.ParsedStreamMessage;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.StreamParser;
+import org.apache.kylin.streaming.StreamingConfig;
+import org.apache.kylin.streaming.StreamingManager;
+import org.apache.kylin.streaming.StreamingUtil;
+import org.apache.kylin.streaming.TimedJsonStreamParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Continuously run this as a daemon to discover how "disordered" the kafka queue is.
+ * This daemon only store a digest so it should not be space-consuming
+ */
+public class StreamingInputAnalyzer extends AbstractHadoopJob {
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_STREAMING = OptionBuilder.withArgName("streaming").hasArg().isRequired(true).withDescription("Name of the streaming").create("streaming");
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_TASK = OptionBuilder.withArgName("task").hasArg().isRequired(true).withDescription("get delay or get disorder degree").create("task");
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_TSCOLNAME = OptionBuilder.withArgName("tsColName").hasArg().isRequired(true).withDescription("field name of the ts").create("tsColName");
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamingInputAnalyzer.class);
+
+ private StreamParser parser;
+ private StreamingConfig streamingConfig;
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ logger.info("jobs args: " + Arrays.toString(args));
+ try {
+ options.addOption(OPTION_STREAMING);
+ options.addOption(OPTION_TASK);
+ options.addOption(OPTION_TSCOLNAME);
+ parseOptions(options, args);
+ logger.info("options: '" + getOptionsAsString() + "'");
+ } catch (Exception e) {
+ e.printStackTrace();
+ printUsage(options);
+ return -1;
+ }
+
+ String streaming = getOptionValue(OPTION_STREAMING);
+ String task = getOptionValue(OPTION_TASK);
+ String tsColName = getOptionValue(OPTION_TSCOLNAME);
+
+ streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streaming);
+ parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), "formatTs=true;tsColName=" + tsColName);
+
+ if ("disorder".equalsIgnoreCase(task)) {
+ analyzeDisorder();
+ } else if ("delay".equalsIgnoreCase(task)) {
+ analyzeLatency();
+ } else {
+ printUsage(options);
+ return -1;
+ }
+
+ return 0;
+ }
+
+ private List<BlockingQueue<StreamMessage>> consume(final int clusterID, final KafkaClusterConfig kafkaClusterConfig, final int partitionCount, long whichtime) {
+ List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
+ for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
+ final kafka.cluster.Broker leadBroker = StreamingUtil.getLeadBroker(kafkaClusterConfig, partitionId);
+ long streamingOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, whichtime, leadBroker, kafkaClusterConfig);
+ logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId);
+ KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
+ Executors.newSingleThreadExecutor(new DaemonThreadFactory()).submit(consumer);
+ result.add(consumer.getStreamQueue(0));
+ }
+ return result;
+ }
+
+ private List<BlockingQueue<StreamMessage>> consumeAll(long whichtime) {
+ int clusterId = 0;
+ final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
+
+ for (final KafkaClusterConfig kafkaClusterConfig : streamingConfig.getKafkaClusterConfigs()) {
+ final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+ final List<BlockingQueue<StreamMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, whichtime);
+ queues.addAll(oneClusterQueue);
+ logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
+ clusterId++;
+ }
+ return queues;
+ }
+
+ private void analyzeLatency() throws InterruptedException {
+ long[] intervals = new long[] { 1, 5, 60, 300, 1800 };
+ final List<BlockingQueue<StreamMessage>> allPartitionData = consumeAll(OffsetRequest.LatestTime());
+ final List<TimeHistogram> allHistograms = Lists.newArrayList();
+ final TimeHistogram overallHistogram = new TimeHistogram(intervals, "overall");
+
+ ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory());
+ for (int i = 0; i < allPartitionData.size(); ++i) {
+ final int index = i;
+ allHistograms.add(new TimeHistogram(intervals, "" + i));
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ StreamMessage message = allPartitionData.get(index).take();
+ ParsedStreamMessage parsedStreamMessage = parser.parse(message);
+ long t = parsedStreamMessage.getTimestamp();
+ allHistograms.get(index).processMillis(System.currentTimeMillis() - t);
+ overallHistogram.processMillis(System.currentTimeMillis() - t);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ });
+ }
+
+ while (true) {
+ System.out.println("Printing status at : " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Calendar.getInstance().getTime()));
+
+ for (TimeHistogram histogram : allHistograms) {
+ histogram.printStatus();
+ }
+ overallHistogram.printStatus();
+ Thread.sleep(300000);
+ }
+ }
+
+ private void analyzeDisorder() throws InterruptedException {
+ final List<BlockingQueue<StreamMessage>> allPartitionData = consumeAll(OffsetRequest.EarliestTime());
+
+ final List<Long> wallClocks = Lists.newArrayList();
+ final List<Long> wallOffset = Lists.newArrayList();
+ final List<Long> maxDisorderTime = Lists.newArrayList();
+ final List<Long> maxDisorderOffset = Lists.newArrayList();
+ final List<Long> processedMessages = Lists.newArrayList();
+
+ for (int i = 0; i < allPartitionData.size(); i++) {
+ wallClocks.add(0L);
+ wallOffset.add(0L);
+ maxDisorderTime.add(0L);
+ maxDisorderOffset.add(0L);
+ processedMessages.add(0L);
+ }
+
+ ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory());
+ final CountDownLatch countDownLatch = new CountDownLatch(allPartitionData.size());
+ for (int i = 0; i < allPartitionData.size(); ++i) {
+ final int index = i;
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ StreamMessage message = allPartitionData.get(index).poll(60, TimeUnit.SECONDS);
+ if (message == null) {
+ System.out.println(String.format("Thread %d is exiting", index));
+ break;
+ }
+ ParsedStreamMessage parsedStreamMessage = parser.parse(message);
+ long t = parsedStreamMessage.getTimestamp();
+ long offset = parsedStreamMessage.getOffset();
+ if (t < wallClocks.get(index)) {
+ maxDisorderTime.set(index, Math.max(wallClocks.get(index) - t, maxDisorderTime.get(index)));
+ maxDisorderOffset.set(index, Math.max(offset - wallOffset.get(index), maxDisorderOffset.get(index)));
+ } else {
+ wallClocks.set(index, t);
+ wallOffset.set(index, offset);
+ }
+ processedMessages.set(index, processedMessages.get(index) + 1);
+
+ if (processedMessages.get(index) % 10000 == 1) {
+ System.out.println(String.format("Thread %d processed %d messages. Max disorder time is %d , max disorder offset is %d",//
+ index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index)));
+ }
+ }
+
+ System.out.println(String.format("Thread %d finishes after %d messages. Max disorder time is %d , max disorder offset is %d",//
+ index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index)));
+ countDownLatch.countDown();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ countDownLatch.await();
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ int exitCode = ToolRunner.run(new StreamingInputAnalyzer(), args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/51196d84/job/src/main/java/org/apache/kylin/job/tools/TimeHistogram.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/TimeHistogram.java b/job/src/main/java/org/apache/kylin/job/tools/TimeHistogram.java
new file mode 100644
index 0000000..fd3d95f
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/tools/TimeHistogram.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.tools;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TimeHistogram {
+ private long[] bucketsBoundary;
+ private AtomicLong[] counters;
+ private String id;
+
+ private static Object printLock = new Object();
+
+ /**
+ * example: [10,20] will generate three buckets: (-∞,10), [10,20),[20,+∞)
+ * unit: second
+ */
+ public TimeHistogram(long[] bucketsBoundary, String id) {
+ this.bucketsBoundary = bucketsBoundary;
+ this.counters = new AtomicLong[this.bucketsBoundary.length + 1];
+ for (int i = 0; i < counters.length; i++) {
+ this.counters[i] = new AtomicLong();
+ }
+ this.id = id;
+ }
+
+ /**
+ * @param second in seconds
+ */
+ public void process(long second) {
+ for (int i = 0; i < bucketsBoundary.length; ++i) {
+ if (second < bucketsBoundary[i]) {
+ counters[i].incrementAndGet();
+ return;
+ }
+ }
+
+ counters[bucketsBoundary.length].incrementAndGet();
+ }
+
+ /**
+ * @param millis in milli seconds
+ */
+ public void processMillis(long millis) {
+ process(millis / 1000);
+ }
+
+ public void printStatus() {
+ long[] countersSnapshot = new long[counters.length];
+ for (int i = 0; i < countersSnapshot.length; i++) {
+ countersSnapshot[i] = counters[i].get();
+ }
+
+ long sum = 0;
+ for (long counter : countersSnapshot) {
+ sum += counter;
+ }
+
+ synchronized (printLock) {
+ System.out.println("============== status of TimeHistogram " + id + " =================");
+
+ for (int i = 0; i < countersSnapshot.length; ++i) {
+ System.out.println(String.format("bucket: %d , count: %d ,percentage: %.4f", i, countersSnapshot[i], 1.0 * countersSnapshot[i] / (sum == 0 ? 1 : sum)));
+ }
+
+ }
+ }
+
+}