You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/09/22 10:49:12 UTC
[1/2] incubator-kylin git commit: KYLIN-1011
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-1011 [created] 78e14681b
KYLIN-1011
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/206c354b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/206c354b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/206c354b
Branch: refs/heads/KYLIN-1011
Commit: 206c354b1e2e6f56da1514b87901bc285423709d
Parents: db6febc
Author: qianhao.zhou <qi...@ebay.com>
Authored: Wed Sep 9 19:13:01 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Sep 22 14:20:56 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/KafkaDataLoader.java | 54 +++++++
build/bin/kylin.sh | 4 +-
build/script/compress.sh | 1 +
.../kylin/engine/streaming/StreamingCLI.java | 99 ------------
.../kylin/engine/streaming/cli/MonitorCLI.java | 70 +++++++++
.../engine/streaming/cli/StreamingCLI.java | 120 +++++++++++++++
.../streaming/monitor/StreamingMonitor.java | 154 +++++++++++++++++++
.../engine/streaming/util/StreamingUtils.java | 2 +-
.../kafka/ByteBufferBackedInputStream.java | 53 +++++++
.../kylin/source/kafka/KafkaStreamingInput.java | 6 +-
.../source/kafka/TimedJsonStreamParser.java | 142 +++++++++++++++++
.../kylin/source/kafka/config/KafkaConfig.java | 12 ++
.../apache/kylin/job/monitor/MonitorCLI.java | 69 ---------
.../kylin/job/monitor/StreamingMonitor.java | 154 -------------------
.../kylin/job/streaming/KafkaDataLoader.java | 54 -------
.../kylin/job/streaming/StreamingBootstrap.java | 2 +-
16 files changed, 614 insertions(+), 382 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
new file mode 100644
index 0000000..95fbc9d
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.job.streaming;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.annotation.Nullable;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.streaming.BrokerConfig;
+import org.apache.kylin.streaming.KafkaClusterConfig;
+import org.apache.kylin.streaming.StreamingConfig;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+/**
+ * Load prepared data into kafka(for test use)
+ */
+public class KafkaDataLoader {
+
+ public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) {
+
+ KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
+ String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
+ @Nullable
+ @Override
+ public String apply(BrokerConfig brokerConfig) {
+ return brokerConfig.getHost() + ":" + brokerConfig.getPort();
+ }
+ }), ",");
+ Properties props = new Properties();
+ props.put("metadata.broker.list", brokerList);
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("request.required.acks", "1");
+
+ ProducerConfig config = new ProducerConfig(props);
+
+ Producer<String, String> producer = new Producer<String, String>(config);
+
+ List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
+ for (int i = 0; i < messages.size(); ++i) {
+ KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
+ keyedMessages.add(keyedMessage);
+ }
+ producer.send(keyedMessages);
+ producer.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 2e9cf24..7f7a986 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -115,7 +115,7 @@ then
-Dkylin.hive.dependency=${hive_dependency} \
-Dkylin.hbase.dependency=${hbase_dependency} \
-Dspring.profiles.active=${spring_profile} \
- org.apache.kylin.job.streaming.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 &
+ org.apache.kylin.engine.streaming.cli.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 &
echo "streaming started name: $3 id: $4"
exit 0
elif [ $2 == "stop" ]
@@ -170,7 +170,7 @@ then
-Dkylin.hive.dependency=${hive_dependency} \
-Dkylin.hbase.dependency=${hbase_dependency} \
-Dspring.profiles.active=${spring_profile} \
- org.apache.kylin.job.monitor.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1
+ org.apache.kylin.engine.streaming.cli.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1
exit 0
else
echo "usage: kylin.sh start or kylin.sh stop"
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/build/script/compress.sh
----------------------------------------------------------------------
diff --git a/build/script/compress.sh b/build/script/compress.sh
index a424b98..c70e567 100755
--- a/build/script/compress.sh
+++ b/build/script/compress.sh
@@ -21,6 +21,7 @@ rm -rf lib tomcat commit_SHA1
find kylin-${version} -type d -exec chmod 755 {} \;
find kylin-${version} -type f -exec chmod 644 {} \;
find kylin-${version} -type f -name "*.sh" -exec chmod 755 {} \;
+mkdir -p ../dist
tar -cvzf ../dist/kylin-${version}.tar.gz kylin-${version}
rm -rf kylin-${version}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
deleted file mode 100644
index 8bf52c1..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.engine.streaming;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.cache.RemoteCacheUpdater;
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public class StreamingCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
-
- public static void main(String[] args) {
- try {
- AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
-
- Preconditions.checkArgument(args[0].equals("streaming"));
- Preconditions.checkArgument(args[1].equals("start"));
-
- int i = 2;
- BootstrapConfig bootstrapConfig = new BootstrapConfig();
- while (i < args.length) {
- String argName = args[i];
- switch (argName) {
- case "-oneoff":
- bootstrapConfig.setOneOff(Boolean.parseBoolean(args[++i]));
- break;
- case "-start":
- bootstrapConfig.setStart(Long.parseLong(args[++i]));
- break;
- case "-end":
- bootstrapConfig.setEnd(Long.parseLong(args[++i]));
- break;
- case "-streaming":
- bootstrapConfig.setStreaming(args[++i]);
- break;
- case "-partition":
- bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
- break;
- case "-fillGap":
- bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
- break;
- default:
- logger.warn("ignore this arg:" + argName);
- }
- i++;
- }
- final Runnable runnable = new OneOffStreamingBuilder(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd()).build();
- runnable.run();
- logger.info("streaming process stop, exit with 0");
- System.exit(0);
- } catch (Exception e) {
- printArgsError(args);
- logger.error("error start streaming", e);
- System.exit(-1);
- }
- }
-
- private static void printArgsError(String[] args) {
- logger.warn("invalid args:" + StringUtils.join(args, " "));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
new file mode 100644
index 0000000..d7dc6b3
--- /dev/null
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.engine.streaming.cli;
+
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class MonitorCLI {
+
+ private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
+
+ public static void main(String[] args) {
+ Preconditions.checkArgument(args[0].equals("monitor"));
+
+ int i = 1;
+ List<String> receivers = null;
+ String host = null;
+ String tableName = null;
+ String authorization = null;
+ String cubeName = null;
+ String projectName = "default";
+ while (i < args.length) {
+ String argName = args[i];
+ switch (argName) {
+ case "-receivers":
+ receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
+ break;
+ case "-host":
+ host = args[++i];
+ break;
+ case "-tableName":
+ tableName = args[++i];
+ break;
+ case "-authorization":
+ authorization = args[++i];
+ break;
+ case "-cubeName":
+ cubeName = args[++i];
+ break;
+ case "-projectName":
+ projectName = args[++i];
+ break;
+ default:
+ throw new RuntimeException("invalid argName:" + argName);
+ }
+ i++;
+ }
+ Preconditions.checkArgument(receivers != null && receivers.size() > 0);
+ final StreamingMonitor streamingMonitor = new StreamingMonitor();
+ if (tableName != null) {
+ logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
+ Preconditions.checkNotNull(host);
+ Preconditions.checkNotNull(authorization);
+ Preconditions.checkNotNull(tableName);
+ streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
+ }
+ if (cubeName != null) {
+ logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
+ streamingMonitor.checkCube(receivers, cubeName,host);
+ }
+ System.exit(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
new file mode 100644
index 0000000..a4ccabc
--- /dev/null
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -0,0 +1,120 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.engine.streaming.cli;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.cache.RemoteCacheUpdater;
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.engine.streaming.BootstrapConfig;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class StreamingCLI {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
+
+ public static void main(String[] args) {
+ try {
+ AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
+
+ Preconditions.checkArgument(args[0].equals("streaming"));
+ Preconditions.checkArgument(args[1].equals("start"));
+
+ int i = 2;
+ BootstrapConfig bootstrapConfig = new BootstrapConfig();
+ while (i < args.length) {
+ String argName = args[i];
+ switch (argName) {
+ case "-oneoff":
+ Boolean.parseBoolean(args[++i]);
+ break;
+ case "-start":
+ bootstrapConfig.setStart(Long.parseLong(args[++i]));
+ break;
+ case "-end":
+ bootstrapConfig.setEnd(Long.parseLong(args[++i]));
+ break;
+ case "-streaming":
+ bootstrapConfig.setStreaming(args[++i]);
+ break;
+ case "-partition":
+ bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
+ break;
+ case "-fillGap":
+ bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
+ break;
+ default:
+ logger.warn("ignore this arg:" + argName);
+ }
+ i++;
+ }
+ if (bootstrapConfig.isFillGap()) {
+ final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(bootstrapConfig.getStreaming());
+ final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+ logger.info("all gaps:" + StringUtils.join(gaps, ","));
+ for (Pair<Long, Long> gap : gaps) {
+ startOneOffCubeStreaming(bootstrapConfig.getStreaming(), gap.getFirst(), gap.getSecond());
+ }
+ } else {
+ startOneOffCubeStreaming(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
+ logger.info("streaming process finished, exit with 0");
+ System.exit(0);
+ }
+ } catch (Exception e) {
+ printArgsError(args);
+ logger.error("error start streaming", e);
+ System.exit(-1);
+ }
+ }
+
+ private static void startOneOffCubeStreaming(String streaming, long start, long end) {
+ final Runnable runnable = new OneOffStreamingBuilder(streaming, start, end).build();
+ runnable.run();
+ }
+
+ private static void printArgsError(String[] args) {
+ logger.warn("invalid args:" + StringUtils.join(args, " "));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
new file mode 100644
index 0000000..a6b8a9f
--- /dev/null
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
@@ -0,0 +1,154 @@
+package org.apache.kylin.engine.streaming.monitor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.MailService;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class StreamingMonitor {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
+
+ public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
+ String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
+ StringBuilder stringBuilder = new StringBuilder();
+ String url = host + "/kylin/api/query";
+ PostMethod request = new PostMethod(url);
+ try {
+
+ request.addRequestHeader("Authorization", "Basic " + authorization);
+ request.addRequestHeader("Content-Type", "application/json");
+ String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
+ request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
+
+ int statusCode = new HttpClient().executeMethod(request);
+ String msg = Bytes.toString(request.getResponseBody());
+ stringBuilder.append("host:").append(host).append("\n");
+ stringBuilder.append("query:").append(query).append("\n");
+ stringBuilder.append("statusCode:").append(statusCode).append("\n");
+ if (statusCode == 200) {
+ title += "succeed";
+ final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
+ stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
+ stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
+ } else {
+ title += "failed";
+ stringBuilder.append("response:").append(msg).append("\n");
+ }
+ } catch (Exception e) {
+ final StringWriter out = new StringWriter();
+ e.printStackTrace(new PrintWriter(out));
+ title += "failed";
+ stringBuilder.append(out.toString());
+ } finally {
+ request.releaseConnection();
+ }
+ logger.info("title:" + title);
+ logger.info("content:" + stringBuilder.toString());
+ sendMail(receivers, title, stringBuilder.toString());
+ }
+
+ public static final List<Pair<Long, Long>> findGaps(String cubeName) {
+ List<CubeSegment> segments = getSortedReadySegments(cubeName);
+ List<Pair<Long, Long>> gaps = Lists.newArrayList();
+ for (int i = 0; i < segments.size() - 1; ++i) {
+ CubeSegment first = segments.get(i);
+ CubeSegment second = segments.get(i + 1);
+ if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+ continue;
+ } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
+ gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
+ }
+ }
+ return gaps;
+ }
+
+ private static List<CubeSegment> getSortedReadySegments(String cubeName) {
+ final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+ Preconditions.checkNotNull(cube);
+ final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
+ logger.info("totally " + segments.size() + " cubeSegments");
+ Collections.sort(segments);
+ return segments;
+ }
+
+ public static final List<Pair<String, String>> findOverlaps(String cubeName) {
+ List<CubeSegment> segments = getSortedReadySegments(cubeName);
+ List<Pair<String, String>> overlaps = Lists.newArrayList();
+ for (int i = 0; i < segments.size() - 1; ++i) {
+ CubeSegment first = segments.get(i);
+ CubeSegment second = segments.get(i + 1);
+ if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+ continue;
+ } else {
+ overlaps.add(Pair.newPair(first.getName(), second.getName()));
+ }
+ }
+ return overlaps;
+ }
+
+ public void checkCube(List<String> receivers, String cubeName, String host) {
+ final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+ if (cube == null) {
+ logger.info("cube:" + cubeName + " does not exist");
+ return;
+ }
+ List<Pair<Long, Long>> gaps = findGaps(cubeName);
+ List<Pair<String, String>> overlaps = Lists.newArrayList();
+ StringBuilder content = new StringBuilder();
+ if (!gaps.isEmpty()) {
+ content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
+ @Nullable
+ @Override
+ public String apply(Pair<Long, Long> input) {
+ return parseInterval(input);
+ }
+ }), "\n")).append("\n");
+ }
+ if (!overlaps.isEmpty()) {
+ content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
+ }
+ if (content.length() > 0) {
+ logger.info(content.toString());
+ sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
+ } else {
+ logger.info("no gaps or overlaps");
+ }
+ }
+
+ private String parseInterval(Pair<Long, Long> interval) {
+ return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
+ }
+
+ private void sendMail(List<String> receivers, String title, String content) {
+ final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
+ mailService.sendMail(receivers, title, content, false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
index 718fc43..47db924 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
@@ -53,7 +53,7 @@ public class StreamingUtils {
}
public static IStreamingOutput getStreamingOutput(String streaming) {
- return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.HBaseStreamingOutput");
+ return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput");
}
public static StreamingBatchBuilder getMicroBatchBuilder(String streaming) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
new file mode 100644
index 0000000..5883493
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ */
+class ByteBufferBackedInputStream extends InputStream {
+
+ private ByteBuffer buf;
+
+ public ByteBufferBackedInputStream(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+ return buf.get() & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] bytes, int off, int len)
+ throws IOException {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+
+ len = Math.min(len, buf.remaining());
+ buf.get(bytes, off, len);
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 393b8e7..09dee50 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -79,6 +79,7 @@ public class KafkaStreamingInput implements IStreamingInput {
@Override
public StreamingBatch getBatchWithTimeWindow(String streaming, int id, long startTime, long endTime) {
try {
+ logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime));
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
final KafkaConfig kafkaConfig = kafkaConfigManager.getStreamingConfig(streaming);
@@ -106,6 +107,7 @@ public class KafkaStreamingInput implements IStreamingInput {
}
}
final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime);
+ logger.info("finish to get streaming batch, total message count:" + messages.size());
return new StreamingBatch(messages, timeRange);
} catch (ReflectiveOperationException e) {
throw new RuntimeException("failed to create instance of StreamingParser", e);
@@ -220,8 +222,8 @@ public class KafkaStreamingInput implements IStreamingInput {
});
if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
Class clazz = Class.forName(kafkaConfig.getParserName());
- Constructor constructor = clazz.getConstructor(List.class);
- return (StreamingParser) constructor.newInstance(columns);
+ Constructor constructor = clazz.getConstructor(List.class, String.class);
+ return (StreamingParser) constructor.newInstance(columns, kafkaConfig.getParserProperties());
} else {
throw new IllegalStateException("invalid StreamingConfig:" + kafkaConfig.getName() + " missing property StreamingParser");
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
new file mode 100644
index 0000000..9b5071b
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -0,0 +1,142 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.source.kafka;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.google.common.collect.Lists;
+import kafka.message.MessageAndOffset;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * each json message with a "timestamp" field
+ */
+public final class TimedJsonStreamParser implements StreamingParser {
+
+ private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class);
+
+ private List<TblColRef> allColumns;
+ private boolean formatTs = false;
+ private final ObjectMapper mapper = new ObjectMapper();
+ private String tsColName = "timestamp";
+ private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+
+ public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) {
+ this.allColumns = allColumns;
+ if (!StringUtils.isEmpty(propertiesStr)) {
+ String[] properties = propertiesStr.split(";");
+ for (String prop : properties) {
+ try {
+ String[] parts = prop.split("=");
+ if (parts.length == 2) {
+ switch (parts[0]) {
+ case "formatTs":
+ this.formatTs = Boolean.valueOf(parts[1]);
+ break;
+ case "tsColName":
+ this.tsColName = parts[1];
+ break;
+ default:
+ break;
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Failed to parse property " + prop);
+ //ignore
+ }
+ }
+ }
+
+ logger.info("TimedJsonStreamParser with formatTs {} tsColName {}", formatTs, tsColName);
+ }
+
+ @Override
+ public StreamingMessage parse(MessageAndOffset messageAndOffset) {
+ try {
+ Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+ String tsStr = root.get(tsColName);
+ //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + //
+ //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));
+ long t;
+ if (StringUtils.isEmpty(tsStr)) {
+ t = 0;
+ } else {
+ t = Long.valueOf(tsStr);
+ }
+ ArrayList<String> result = Lists.newArrayList();
+
+ for (TblColRef column : allColumns) {
+ String columnName = column.getName();
+ if (columnName.equalsIgnoreCase("minute_start")) {
+ long minuteStart = TimeUtil.getMinuteStart(t);
+ result.add(formatTs ? DateFormat.formatToTimeStr(minuteStart) : String.valueOf(minuteStart));
+ } else if (columnName.equalsIgnoreCase("hour_start")) {
+ long hourStart = TimeUtil.getHourStart(t);
+ result.add(formatTs ? DateFormat.formatToTimeStr(hourStart) : String.valueOf(hourStart));
+ } else if (columnName.equalsIgnoreCase("day_start")) {
+ //of day start we'll add yyyy-mm-dd
+ long ts = TimeUtil.getDayStart(t);
+ result.add(DateFormat.formatToDateStr(ts));
+ } else {
+ String x = root.get(columnName.toLowerCase());
+ result.add(x);
+ }
+ }
+
+ return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap());
+
+ } catch (IOException e) {
+ logger.error("error", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean filter(StreamingMessage streamingMessage) {
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index b56231a..1aff0ce 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -81,6 +81,10 @@ public class KafkaConfig extends RootPersistentEntity {
@JsonProperty("margin")
private long margin;
+ //"configA=1;configB=2"
+ @JsonProperty("parserProperties")
+ private String parserProperties;
+
public List<KafkaClusterConfig> getKafkaClusterConfigs() {
return kafkaClusterConfigs;
}
@@ -141,6 +145,14 @@ public class KafkaConfig extends RootPersistentEntity {
this.margin = margin;
}
+ public String getParserProperties() {
+ return parserProperties;
+ }
+
+ public void setParserProperties(String parserProperties) {
+ this.parserProperties = parserProperties;
+ }
+
@Override
public KafkaConfig clone() {
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java b/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
deleted file mode 100644
index 7b9831a..0000000
--- a/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.kylin.job.monitor;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class MonitorCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
-
- public static void main(String[] args) {
- Preconditions.checkArgument(args[0].equals("monitor"));
-
- int i = 1;
- List<String> receivers = null;
- String host = null;
- String tableName = null;
- String authorization = null;
- String cubeName = null;
- String projectName = "default";
- while (i < args.length) {
- String argName = args[i];
- switch (argName) {
- case "-receivers":
- receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
- break;
- case "-host":
- host = args[++i];
- break;
- case "-tableName":
- tableName = args[++i];
- break;
- case "-authorization":
- authorization = args[++i];
- break;
- case "-cubeName":
- cubeName = args[++i];
- break;
- case "-projectName":
- projectName = args[++i];
- break;
- default:
- throw new RuntimeException("invalid argName:" + argName);
- }
- i++;
- }
- Preconditions.checkArgument(receivers != null && receivers.size() > 0);
- final StreamingMonitor streamingMonitor = new StreamingMonitor();
- if (tableName != null) {
- logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
- Preconditions.checkNotNull(host);
- Preconditions.checkNotNull(authorization);
- Preconditions.checkNotNull(tableName);
- streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
- }
- if (cubeName != null) {
- logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
- streamingMonitor.checkCube(receivers, cubeName,host);
- }
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java b/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
deleted file mode 100644
index e23f065..0000000
--- a/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package org.apache.kylin.job.monitor;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingMonitor {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
-
- public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
- String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
- StringBuilder stringBuilder = new StringBuilder();
- String url = host + "/kylin/api/query";
- PostMethod request = new PostMethod(url);
- try {
-
- request.addRequestHeader("Authorization", "Basic " + authorization);
- request.addRequestHeader("Content-Type", "application/json");
- String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
- request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
-
- int statusCode = new HttpClient().executeMethod(request);
- String msg = Bytes.toString(request.getResponseBody());
- stringBuilder.append("host:").append(host).append("\n");
- stringBuilder.append("query:").append(query).append("\n");
- stringBuilder.append("statusCode:").append(statusCode).append("\n");
- if (statusCode == 200) {
- title += "succeed";
- final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
- stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
- stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
- } else {
- title += "failed";
- stringBuilder.append("response:").append(msg).append("\n");
- }
- } catch (Exception e) {
- final StringWriter out = new StringWriter();
- e.printStackTrace(new PrintWriter(out));
- title += "failed";
- stringBuilder.append(out.toString());
- } finally {
- request.releaseConnection();
- }
- logger.info("title:" + title);
- logger.info("content:" + stringBuilder.toString());
- sendMail(receivers, title, stringBuilder.toString());
- }
-
- public static final List<Pair<Long, Long>> findGaps(String cubeName) {
- List<CubeSegment> segments = getSortedReadySegments(cubeName);
- List<Pair<Long, Long>> gaps = Lists.newArrayList();
- for (int i = 0; i < segments.size() - 1; ++i) {
- CubeSegment first = segments.get(i);
- CubeSegment second = segments.get(i + 1);
- if (first.getDateRangeEnd() == second.getDateRangeStart()) {
- continue;
- } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
- gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
- }
- }
- return gaps;
- }
-
- private static List<CubeSegment> getSortedReadySegments(String cubeName) {
- final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
- Preconditions.checkNotNull(cube);
- final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
- logger.info("totally " + segments.size() + " cubeSegments");
- Collections.sort(segments);
- return segments;
- }
-
- public static final List<Pair<String, String>> findOverlaps(String cubeName) {
- List<CubeSegment> segments = getSortedReadySegments(cubeName);
- List<Pair<String, String>> overlaps = Lists.newArrayList();
- for (int i = 0; i < segments.size() - 1; ++i) {
- CubeSegment first = segments.get(i);
- CubeSegment second = segments.get(i + 1);
- if (first.getDateRangeEnd() == second.getDateRangeStart()) {
- continue;
- } else {
- overlaps.add(Pair.newPair(first.getName(), second.getName()));
- }
- }
- return overlaps;
- }
-
- public void checkCube(List<String> receivers, String cubeName, String host) {
- final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
- if (cube == null) {
- logger.info("cube:" + cubeName + " does not exist");
- return;
- }
- List<Pair<Long, Long>> gaps = findGaps(cubeName);
- List<Pair<String, String>> overlaps = Lists.newArrayList();
- StringBuilder content = new StringBuilder();
- if (!gaps.isEmpty()) {
- content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
- @Nullable
- @Override
- public String apply(Pair<Long, Long> input) {
- return parseInterval(input);
- }
- }), "\n")).append("\n");
- }
- if (!overlaps.isEmpty()) {
- content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
- }
- if (content.length() > 0) {
- logger.info(content.toString());
- sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
- } else {
- logger.info("no gaps or overlaps");
- }
- }
-
- private String parseInterval(Pair<Long, Long> interval) {
- return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
- }
-
- private void sendMail(List<String> receivers, String title, String content) {
- final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
- mailService.sendMail(receivers, title, content, false);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
deleted file mode 100644
index 95fbc9d..0000000
--- a/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import java.util.List;
-import java.util.Properties;
-
-import javax.annotation.Nullable;
-
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.streaming.BrokerConfig;
-import org.apache.kylin.streaming.KafkaClusterConfig;
-import org.apache.kylin.streaming.StreamingConfig;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-
-/**
- * Load prepared data into kafka(for test use)
- */
-public class KafkaDataLoader {
-
- public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) {
-
- KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
- String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
- @Nullable
- @Override
- public String apply(BrokerConfig brokerConfig) {
- return brokerConfig.getHost() + ":" + brokerConfig.getPort();
- }
- }), ",");
- Properties props = new Properties();
- props.put("metadata.broker.list", brokerList);
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("request.required.acks", "1");
-
- ProducerConfig config = new ProducerConfig(props);
-
- Producer<String, String> producer = new Producer<String, String>(config);
-
- List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
- for (int i = 0; i < messages.size(); ++i) {
- KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
- keyedMessages.add(keyedMessage);
- }
- producer.send(keyedMessages);
- producer.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/206c354b/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 4212fea..551006f 100644
--- a/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -63,7 +63,7 @@ import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.job.monitor.StreamingMonitor;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
import org.apache.kylin.metadata.model.IntermediateColumnDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.hbase.HBaseConnection;
[2/2] incubator-kylin git commit: refactor
Posted by qh...@apache.org.
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/78e14681
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/78e14681
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/78e14681
Branch: refs/heads/KYLIN-1011
Commit: 78e14681b561a82ece2131b2398ad945f75965fa
Parents: 206c354
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Sep 22 16:48:39 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Sep 22 16:48:39 2015 +0800
----------------------------------------------------------------------
assembly/pom.xml | 5 -
.../kylin/job/BuildCubeWithStreamTest.java | 35 +++--
.../apache/kylin/job/BuildIIWithStreamTest.java | 43 +++---
.../java/org/apache/kylin/job/DeployUtil.java | 32 ++---
.../job/ITKafkaBasedIIStreamBuilderTest.java | 85 -----------
.../kylin/job/hadoop/invertedindex/IITest.java | 39 ++---
.../job/streaming/CubeStreamConsumerTest.java | 90 ------------
.../kylin/job/streaming/KafkaDataLoader.java | 11 +-
.../streaming/PeriodicalStreamBuilderTest.java | 144 -------------------
.../invertedindex/streaming/SliceBuilder.java | 81 +++++++++++
.../source/kafka/StringStreamingParser.java | 65 +++++++++
11 files changed, 220 insertions(+), 410 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/78e14681/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 99557fb..9f17913 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -49,11 +49,6 @@
<artifactId>kylin-invertedindex</artifactId>
<version>${project.parent.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-streaming</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
<!-- Env & Test -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/78e14681/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index b02b2f2..6bfd560 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -34,21 +34,18 @@
package org.apache.kylin.job;
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.job.streaming.BootstrapConfig;
-import org.apache.kylin.job.streaming.StreamingBootstrap;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
-import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.StreamingManager;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -56,6 +53,10 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
/**
* for streaming cubing case "test_streaming_table"
*/
@@ -84,12 +85,14 @@ public class BuildCubeWithStreamTest {
kylinConfig = KylinConfig.getInstanceFromEnv();
- //Use a random toplic for kafka data stream
- StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingName);
+ final StreamingConfig config = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName);
+
+ //Use a random topic for kafka data stream
+ KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getStreamingConfig(streamingName);
streamingConfig.setTopic(UUID.randomUUID().toString());
- StreamingManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
+ KafkaConfigManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
- DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, streamingConfig);
+ DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, config.getCubeName(), streamingConfig);
}
@AfterClass
@@ -120,13 +123,7 @@ public class BuildCubeWithStreamTest {
@Test
public void test() throws Exception {
for (long start = startTime; start < endTime; start += batchInterval) {
- BootstrapConfig bootstrapConfig = new BootstrapConfig();
- bootstrapConfig.setStart(start);
- bootstrapConfig.setEnd(start + batchInterval);
- bootstrapConfig.setOneOff(true);
- bootstrapConfig.setPartitionId(0);
- bootstrapConfig.setStreaming(streamingName);
- StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(bootstrapConfig);
+ new OneOffStreamingBuilder(streamingName, start, start + batchInterval).build().run();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/78e14681/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 5ca3b29..89be628 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -39,11 +39,7 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -55,12 +51,16 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.invertedindex.streaming.SliceBuilder;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -69,9 +69,6 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.source.hive.HiveTableReader;
import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -184,32 +181,30 @@ public class BuildIIWithStreamTest {
logger.info("measure:" + tblColRef.getName());
}
}
- LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<StreamMessage>();
final IISegment segment = createSegment(iiName);
String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
ToolRunner.run(new IICreateHTableJob(), args);
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- final StreamBuilder streamBuilder = StreamBuilder.newLimitedSizeStreamBuilder(iiName, queue, new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0), 0, segment.getIIDesc().getSliceSize());
+ final IIDesc iiDesc = segment.getIIDesc();
+ final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0, iiDesc.isUseLocalDictionary());
List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
int count = sorted.size();
+ ArrayList<StreamingMessage> messages = Lists.newArrayList();
for (String[] row : sorted) {
- logger.info("another row: " + StringUtils.join(row, ","));
- queue.put(parse(row));
+ if (messages.size() >= iiDesc.getSliceSize()) {
+ messages.add(parse(row));
+ } else {
+ sliceBuilder.buildSlice(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())));
+ messages = Lists.newArrayList();
+ }
+ }
+ if (!messages.isEmpty()) {
+ sliceBuilder.buildSlice(new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())));
}
reader.close();
logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
- queue.put(StreamMessage.EOF);
- final Future<?> future = executorService.submit(streamBuilder);
- try {
- future.get();
- } catch (Exception e) {
- logger.error("stream build failed", e);
- fail("stream build failed");
- }
-
logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
}
@@ -225,8 +220,8 @@ public class BuildIIWithStreamTest {
}
}
- private StreamMessage parse(String[] row) {
- return new StreamMessage(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes());
+ private StreamingMessage parse(String[] row) {
+ return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object>emptyMap());
}
private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/78e14681/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index d47a664..9ec4c88 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,14 +18,9 @@
package org.apache.kylin.job;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
@@ -35,6 +30,7 @@ import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.job.dataGen.FactTableGenerator;
import org.apache.kylin.job.streaming.KafkaDataLoader;
import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
@@ -43,15 +39,16 @@ import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.hive.HiveClient;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.TimedJsonStreamParser;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.TimedJsonStreamParser;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.maven.model.Model;
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
+import java.io.*;
+import java.util.List;
public class DeployUtil {
private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class);
@@ -146,14 +143,13 @@ public class DeployUtil {
deployHiveTables();
}
- public static void prepareTestDataForStreamingCube(long startTime, long endTime, StreamingConfig streamingConfig) throws IOException {
- CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(streamingConfig.getCubeName());
+ public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, KafkaConfig kafkaConfig) throws IOException {
+ CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
TableDesc tableDesc = cubeInstance.getFactTableDesc();
-
//load into kafka
- KafkaDataLoader.loadIntoKafka(streamingConfig, data);
- logger.info("Write {} messages into topic {}", data.size(), streamingConfig.getTopic());
+ KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data);
+ logger.info("Write {} messages into topic {}", data.size(), kafkaConfig.getTopic());
//csv data for H2 use
List<TblColRef> tableColumns = Lists.newArrayList();
@@ -163,7 +159,7 @@ public class DeployUtil {
TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
StringBuilder sb = new StringBuilder();
for (String json : data) {
- List<String> rowColumns = timedJsonStreamParser.parse(new StreamMessage(0, json.getBytes())).getStreamMessage();
+ List<String> rowColumns = timedJsonStreamParser.parse(new MessageAndOffset(new Message(json.getBytes()), 0)).getData();
sb.append(StringUtils.join(rowColumns, ","));
sb.append(System.getProperty("line.separator"));
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/78e14681/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
deleted file mode 100644
index 6a615cb..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.job;
-
-import java.io.File;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.streaming.StreamingBootstrap;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-@Ignore("this test case will break existing metadata store")
-public class ITKafkaBasedIIStreamBuilderTest {
-
- private static final Logger logger = LoggerFactory.getLogger(ITKafkaBasedIIStreamBuilderTest.class);
-
- private KylinConfig kylinConfig;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
- }
-
- @Before
- public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- DeployUtil.initCliWorkDir();
- DeployUtil.deployMetadata();
- DeployUtil.overrideJobJarLocations();
- }
-
- @Test
- public void test() throws Exception {
- final StreamingBootstrap bootstrap = StreamingBootstrap.getInstance(kylinConfig);
- bootstrap.start("eagle", 0);
- Thread.sleep(30 * 60 * 1000);
- logger.info("time is up, stop streaming");
- bootstrap.stop();
- Thread.sleep(5 * 1000);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/78e14681/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index dcd460b..913a2f7 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -10,12 +10,18 @@ import java.util.Set;
import javax.annotation.Nullable;
+import com.google.common.base.Function;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.common.util.FIFOIterable;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.index.Slice;
@@ -26,6 +32,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
import org.apache.kylin.invertedindex.model.IIRow;
import org.apache.kylin.invertedindex.model.KeyValueCodec;
+import org.apache.kylin.invertedindex.streaming.SliceBuilder;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -33,6 +40,8 @@ import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.ParameterDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.StreamingParser;
+import org.apache.kylin.source.kafka.StringStreamingParser;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
@@ -41,18 +50,11 @@ import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.ClearTextDictionar
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointAggregators;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.ParsedStreamMessage;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamParser;
-import org.apache.kylin.streaming.StringStreamParser;
-import org.apache.kylin.streaming.invertedindex.SliceBuilder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -78,24 +80,23 @@ public class IITest extends LocalFileMetadataTestCase {
this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
this.iiDesc = ii.getDescriptor();
- List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() {
+ List<MessageAndOffset> messages = Lists.transform(Arrays.asList(inputData), new Function<String, MessageAndOffset>() {
@Nullable
@Override
- public StreamMessage apply(String input) {
- return new StreamMessage(System.currentTimeMillis(), input.getBytes());
+ public MessageAndOffset apply(String input) {
+ return new MessageAndOffset(new Message(input.getBytes()), System.currentTimeMillis());
}
});
- List<List<String>> parsedStreamMessages = Lists.newArrayList();
- StreamParser parser = StringStreamParser.instance;
-
- MicroStreamBatch batch = new MicroStreamBatch(0);
- for (StreamMessage message : streamMessages) {
- ParsedStreamMessage parsedStreamMessage = parser.parse(message);
- if ((parsedStreamMessage.isAccepted())) {
- batch.add(parsedStreamMessage);
+ final StreamingParser parser = StringStreamingParser.instance;
+ final List<StreamingMessage> streamingMessages = Lists.transform(messages, new Function<MessageAndOffset, StreamingMessage>() {
+ @Nullable
+ @Override
+ public StreamingMessage apply(@Nullable MessageAndOffset input) {
+ return parser.parse(input);
}
- }
+ });
+ StreamingBatch batch = new StreamingBatch(streamingMessages, Pair.newPair(0L, System.currentTimeMillis()));
iiRows = Lists.newArrayList();
final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/78e14681/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
deleted file mode 100644
index be4fa26..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-@Ignore
-public class CubeStreamConsumerTest {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumerTest.class);
-
- private KylinConfig kylinConfig;
-
- private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
- System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
- }
-
- @Before
- public void before() throws Exception {
- HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- DeployUtil.initCliWorkDir();
- DeployUtil.deployMetadata();
- DeployUtil.overrideJobJarLocations();
- final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
- // remove all existing segments
- CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
-
- }
-
- @Test
- public void test() throws Exception {
- LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
- List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
- queues.add(queue);
- StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000);
- final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
- loadDataFromLocalFile(queue, 100000);
- future.get();
- }
-
- private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
- BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
- String line;
- int count = 0;
- while ((line = br.readLine()) != null && count++ < maxCount) {
- final List<String> strings = Arrays.asList(line.split("\t"));
- queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
- }
- queue.put(StreamMessage.EOF);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/78e14681/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index 95fbc9d..c3caa9b 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -10,22 +10,21 @@ import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.streaming.BrokerConfig;
-import org.apache.kylin.streaming.KafkaClusterConfig;
-import org.apache.kylin.streaming.StreamingConfig;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
+import org.apache.kylin.source.kafka.config.BrokerConfig;
+import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
/**
* Load prepared data into kafka(for test use)
*/
public class KafkaDataLoader {
- public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) {
+ public static void loadIntoKafka(List<KafkaClusterConfig> kafkaClusterConfigs, List<String> messages) {
- KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
+ KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
@Nullable
@Override
@@ -44,7 +43,7 @@ public class KafkaDataLoader {
List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
for (int i = 0; i < messages.size(); ++i) {
- KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
+ KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
keyedMessages.add(keyedMessage);
}
producer.send(keyedMessages);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/78e14681/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
deleted file mode 100644
index dc6d312..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.MicroStreamBatchConsumer;
-import org.apache.kylin.streaming.ParsedStreamMessage;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamParser;
-import org.apache.kylin.streaming.StreamingManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class PeriodicalStreamBuilderTest extends LocalFileMetadataTestCase {
-
- private static final Logger logger = LoggerFactory.getLogger(PeriodicalStreamBuilderTest.class);
-
- @Before
- public void setup() {
- this.createTestMetadata();
-
- }
-
- @After
- public void clear() {
- this.cleanupTestMetadata();
- }
-
- private List<StreamMessage> prepareTestData(long start, long end, int count) {
- double step = (double) (end - start) / (count - 1);
- long ts = start;
- int offset = 0;
- ArrayList<StreamMessage> result = Lists.newArrayList();
- for (int i = 0; i < count - 1; ++i) {
- result.add(new StreamMessage(offset++, String.valueOf(ts).getBytes()));
- ts += step;
- }
- result.add(new StreamMessage(offset++, String.valueOf(end).getBytes()));
- assertEquals(count, result.size());
- assertEquals(start + "", new String(result.get(0).getRawData()));
- assertEquals(end + "", new String(result.get(count - 1).getRawData()));
- return result;
- }
-
- @Test
- public void test() throws ExecutionException, InterruptedException {
-
- List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
- queues.add(new LinkedBlockingQueue<StreamMessage>());
- queues.add(new LinkedBlockingQueue<StreamMessage>());
-
- final long interval = 3000L;
- final long nextPeriodStart = TimeUtil.getNextPeriodStart(System.currentTimeMillis(), interval);
-
- final List<Integer> partitionIds = Lists.newArrayList();
- for (int i = 0; i < queues.size(); i++) {
- partitionIds.add(i);
- }
-
- final MicroStreamBatchConsumer consumer = new MicroStreamBatchConsumer() {
- @Override
- public void consume(MicroStreamBatch microStreamBatch) throws Exception {
- logger.info("consuming batch:" + microStreamBatch.getPartitionId() + " count:" + microStreamBatch.size() + " timestamp:" + microStreamBatch.getTimestamp() + " offset:" + microStreamBatch.getOffset());
- }
-
- @Override
- public void stop() {
- logger.info("consumer stopped");
- }
- };
- final StreamBuilder streamBuilder = StreamBuilder.newPeriodicalStreamBuilder("test", queues, consumer, nextPeriodStart, interval);
-
- streamBuilder.setStreamParser(new StreamParser() {
- @Override
- public ParsedStreamMessage parse(StreamMessage streamMessage) {
- return new ParsedStreamMessage(Collections.<String> emptyList(), streamMessage.getOffset(), Long.parseLong(new String(streamMessage.getRawData())), true);
- }
- });
-
- Future<?> future = Executors.newSingleThreadExecutor().submit(streamBuilder);
- long timeout = nextPeriodStart + interval;
- int messageCount = 0;
- int inPeriodMessageCount = 0;
- int expectedOffset = 0;
- logger.info("prepare to add StreamMessage");
- while (true) {
- long ts = System.currentTimeMillis();
- if (ts >= timeout + interval) {
- break;
- }
- if (ts >= nextPeriodStart && ts < timeout) {
- inPeriodMessageCount++;
- }
- for (BlockingQueue<StreamMessage> queue : queues) {
- queue.put(new StreamMessage(messageCount, String.valueOf(ts).getBytes()));
- }
- if (expectedOffset == 0 && ts >= timeout) {
- expectedOffset = messageCount - 1;
- }
- messageCount++;
- Thread.sleep(10);
- }
- logger.info("totally put " + messageCount + " StreamMessages");
- logger.info("totally in period " + inPeriodMessageCount + " StreamMessages");
-
- for (BlockingQueue<StreamMessage> queue : queues) {
- queue.put(StreamMessage.EOF);
- }
-
- future.get();
-
- for (BlockingQueue<StreamMessage> queue : queues) {
- queue.take();
- }
-
- final Map<Integer, Long> offsets = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getOffset("test", partitionIds);
- logger.info("offset:" + offsets);
- for (Long offset : offsets.values()) {
- assertEquals(expectedOffset, offset.longValue());
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/78e14681/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
new file mode 100644
index 0000000..ba337c8
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.invertedindex.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.invertedindex.index.BatchSliceMaker;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ */
+public final class SliceBuilder {
+
+ private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
+
+ private final BatchSliceMaker sliceMaker;
+ private final IIDesc iiDesc;
+ private final boolean useLocalDict;
+
+ public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
+ this.iiDesc = desc;
+ this.sliceMaker = new BatchSliceMaker(desc, shard);
+ this.useLocalDict = useLocalDict;
+ }
+
+ public Slice buildSlice(StreamingBatch microStreamBatch) {
+ final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
+ @Nullable
+ @Override
+ public List<String> apply(@Nullable StreamingMessage input) {
+ return input.getData();
+ }
+ });
+ final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+ TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
+ return build(messages, tableRecordInfo, dictionaries);
+ }
+
+ private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
+ final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
+ @Nullable
+ @Override
+ public TableRecord apply(@Nullable List<String> input) {
+ TableRecord result = tableRecordInfo.createTableRecord();
+ for (int i = 0; i < input.size(); i++) {
+ result.setValueString(i, input.get(i));
+ }
+ return result;
+ }
+ }));
+ slice.setLocalDictionaries(localDictionary);
+ return slice;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/78e14681/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
new file mode 100644
index 0000000..307f73a
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ * contributor license agreements. See the NOTICE file distributed with
+ *
+ * this work for additional information regarding copyright ownership.
+ *
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ * (the "License"); you may not use this file except in compliance with
+ *
+ * the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ *
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ *
+ * limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.source.kafka;
+
+import com.google.common.collect.Lists;
+import kafka.message.MessageAndOffset;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+/**
+ */
+public final class StringStreamingParser implements StreamingParser {
+
+ public static final StringStreamingParser instance = new StringStreamingParser();
+
+ private StringStreamingParser() {
+ }
+
+ @Override
+ public StreamingMessage parse(MessageAndOffset kafkaMessage) {
+ final ByteBuffer payload = kafkaMessage.message().payload();
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object>emptyMap());
+ }
+
+ @Override
+ public boolean filter(StreamingMessage streamingMessage) {
+ return true;
+ }
+}