You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/07/03 13:43:40 UTC

[1/3] kylin git commit: KYLIN-1818: change kafka dep to provided [Forced Update!]

Repository: kylin
Updated Branches:
  refs/heads/stream_m1 32a08f8bf -> a9c75eaef (forced update)


KYLIN-1818: change kafka dep to provided

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/25081d92
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/25081d92
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/25081d92

Branch: refs/heads/stream_m1
Commit: 25081d928ef081c343044af3587dc7e2830a4a46
Parents: 4ffe818
Author: Yiming Liu <li...@gmail.com>
Authored: Fri Jun 24 15:13:14 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Jun 24 15:17:33 2016 +0800

----------------------------------------------------------------------
 assembly/pom.xml                   |  1 -
 build/bin/find-kafka-dependency.sh | 51 +++++++++++++++++++++++++++++++++
 build/bin/kylin.sh                 |  4 +++
 3 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/25081d92/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index d643b62..d7764a0 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -177,7 +177,6 @@
             <artifactId>kafka_2.10</artifactId>
             <version>${kafka.version}</version>
             <scope>provided</scope>
-            <!-- FIXME: Should be provided just like hive and hbase, inflates job jar from 9 MB to 21 MB -->
         </dependency>
 
     </dependencies>

http://git-wip-us.apache.org/repos/asf/kylin/blob/25081d92/build/bin/find-kafka-dependency.sh
----------------------------------------------------------------------
diff --git a/build/bin/find-kafka-dependency.sh b/build/bin/find-kafka-dependency.sh
new file mode 100644
index 0000000..c6b9c24
--- /dev/null
+++ b/build/bin/find-kafka-dependency.sh
@@ -0,0 +1,51 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+kafka_home=
+
+if [ -n "$KAFKA_HOME" ]
+then
+    echo "KAFKA_HOME is set to: $KAFKA_HOME, use it to locate kafka dependencies."
+    kafka_home=$KAFKA_HOME
+fi
+
+if [ -z "$KAFKA_HOME" ]
+then
+    echo "Couldn't find kafka home. Please set KAFKA_HOME to the path which contains kafka dependencies."
+    exit 1
+fi
+
+# works for kafka 9+
+kafka_client=`find -L "$(dirname $kafka_home)" -name 'kafka-clients-[a-z0-9A-Z\.-]*.jar' ! -name '*doc*' ! -name '*test*' ! -name '*sources*' ''-printf '%p:' | sed 's/:$//'`
+if [ -z "$kafka_client" ]
+then
+# works for kafka 8
+    kafka_broker=`find -L "$(dirname $kafka_home)" -name 'kafka_[a-z0-9A-Z\.-]*.jar' ! -name '*doc*' ! -name '*test*' ! -name '*sources*' ''-printf '%p:' | sed 's/:$//'`
+    if [ -z "$kafka_broker" ]
+    then
+        echo "kafka client lib not found"
+        exit 1
+    else
+        echo "kafka dependency: $kafka_broker"
+        export kafka_dependency
+    fi
+else
+    echo "kafka dependency: $kafka_client"
+    export kafka_dependency
+fi

http://git-wip-us.apache.org/repos/asf/kylin/blob/25081d92/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 2ea3a8c..f3710c3 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -141,6 +141,7 @@ then
         #retrive $hive_dependency and $hbase_dependency
         source ${dir}/find-hive-dependency.sh
         source ${dir}/find-hbase-dependency.sh
+        source ${dir}/find-kafka-dependency.sh
         #retrive $KYLIN_EXTRA_START_OPTS
         if [ -f "${dir}/setenv.sh" ]
             then source ${dir}/setenv.sh
@@ -153,6 +154,7 @@ then
         hbase ${KYLIN_EXTRA_START_OPTS} \
         -Dlog4j.configuration=kylin-log4j.properties\
         -Dkylin.hive.dependency=${hive_dependency} \
+        -Dkylin.kafka.dependency=${kafka_dependency} \
         -Dkylin.hbase.dependency=${hbase_dependency} \
         org.apache.kylin.engine.streaming.cli.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 &
         echo "streaming started name: $3 id: $4"
@@ -187,6 +189,7 @@ then
     #retrive $hive_dependency and $hbase_dependency
     source ${dir}/find-hive-dependency.sh
     source ${dir}/find-hbase-dependency.sh
+    source ${dir}/find-kafka-dependency.sh
     #retrive $KYLIN_EXTRA_START_OPTS
     if [ -f "${dir}/setenv.sh" ]
         then source ${dir}/setenv.sh
@@ -199,6 +202,7 @@ then
     hbase ${KYLIN_EXTRA_START_OPTS} \
     -Dlog4j.configuration=kylin-log4j.properties\
     -Dkylin.hive.dependency=${hive_dependency} \
+    -Dkyiln.kafka.dependency=${kafka_dependency} \
     -Dkylin.hbase.dependency=${hbase_dependency} \
     org.apache.kylin.engine.streaming.cli.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1
     exit 0


[2/3] kylin git commit: refactor some streaming classes

Posted by sh...@apache.org.
refactor some streaming classes


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cd116a6c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cd116a6c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cd116a6c

Branch: refs/heads/stream_m1
Commit: cd116a6c5625009fe40f4136792c382a9a355e5f
Parents: 25081d9
Author: shaofengshi <sh...@apache.org>
Authored: Thu Jun 23 10:34:52 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Jun 24 15:39:29 2016 +0800

----------------------------------------------------------------------
 .../src/test/java/org/apache/kylin/job/DeployUtil.java    |  2 +-
 .../org/apache/kylin/job/hadoop/invertedindex/IITest.java |  8 ++------
 .../org/apache/kylin/common/util/StreamingMessage.java    |  4 ++++
 .../apache/kylin/source/kafka/KafkaStreamingInput.java    |  3 ++-
 .../org/apache/kylin/source/kafka/StreamingParser.java    |  3 ++-
 .../apache/kylin/source/kafka/StringStreamingParser.java  | 10 ++++------
 .../apache/kylin/source/kafka/TimedJsonStreamParser.java  |  9 ++++-----
 .../kylin/source/kafka/diagnose/KafkaInputAnalyzer.java   |  3 ++-
 .../org/apache/kylin/source/kafka/util/KafkaUtils.java    |  3 ++-
 9 files changed, 23 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index a0a9f88..d56dd64 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -168,7 +168,7 @@ public class DeployUtil {
         TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
         StringBuilder sb = new StringBuilder();
         for (String json : data) {
-            List<String> rowColumns = timedJsonStreamParser.parse(new MessageAndOffset(new Message(json.getBytes()), 0)).getData();
+            List<String> rowColumns = timedJsonStreamParser.parse((new MessageAndOffset(new Message(json.getBytes()), 0)).message().payload()).getData();
             sb.append(StringUtils.join(rowColumns, ","));
             sb.append(System.getProperty("line.separator"));
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index da25143..c34ce55 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -38,11 +38,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.util.FIFOIterable;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.common.util.*;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
@@ -114,7 +110,7 @@ public class IITest extends LocalFileMetadataTestCase {
             @Nullable
             @Override
             public StreamingMessage apply(@Nullable MessageAndOffset input) {
-                return parser.parse(input);
+                return parser.parse(input.message().payload());
             }
         });
         StreamingBatch batch = new StreamingBatch(streamingMessages, Pair.newPair(0L, System.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
index f327db2..53ab195 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
@@ -51,6 +51,10 @@ public class StreamingMessage {
         return offset;
     }
 
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
     public final long getTimestamp() {
         return timestamp;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 564c221..3243754 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -206,7 +206,8 @@ public class KafkaStreamingInput implements IStreamingInput {
                     for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
                         offset++;
                         consumeMsgCount++;
-                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
+                        streamingMessage.setOffset(messageAndOffset.offset());
                         if (streamingParser.filter(streamingMessage)) {
                             final long timestamp = streamingMessage.getTimestamp();
                             if (timestamp >= timeRange.getFirst() && timestamp < timeRange.getSecond()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 9075c77..3bc42ac 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -40,6 +40,7 @@ import java.util.Set;
 
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
+import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.common.util.TimeUtil;
@@ -66,7 +67,7 @@ public abstract class StreamingParser {
      * @param message
      * @return StreamingMessage must not be NULL
      */
-    abstract public StreamingMessage parse(Object message);
+    abstract public StreamingMessage parse(ByteBuffer message);
 
     abstract public boolean filter(StreamingMessage streamingMessage);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index 5226899..37bcbfa 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -55,12 +55,10 @@ public final class StringStreamingParser extends StreamingParser {
     }
 
     @Override
-    public StreamingMessage parse(Object message) {
-        MessageAndOffset kafkaMessage = (MessageAndOffset) message;
-        final ByteBuffer payload = kafkaMessage.message().payload();
-        byte[] bytes = new byte[payload.limit()];
-        payload.get(bytes);
-        return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object> emptyMap());
+    public StreamingMessage parse(ByteBuffer message) {
+        byte[] bytes = new byte[message.limit()];
+        message.get(bytes);
+        return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), 0, 0, Collections.<String, Object> emptyMap());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 63f5637..4b1c579 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -37,9 +37,9 @@ package org.apache.kylin.source.kafka;
 import java.io.IOException;
 import java.util.*;
 
-import kafka.message.MessageAndOffset;
 
 import org.apache.commons.lang3.StringUtils;
+import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -95,10 +95,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
     }
 
     @Override
-    public StreamingMessage parse(Object msg) {
-        MessageAndOffset messageAndOffset = (MessageAndOffset) msg;
+    public StreamingMessage parse(ByteBuffer buffer) {
         try {
-            Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
             Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
             root.putAll(message);
             String tsStr = root.get(tsColName);
@@ -119,7 +118,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
                 }
             }
 
-            return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap());
+            return new StreamingMessage(result, 0, t, Collections.<String, Object>emptyMap());
 
         } catch (IOException e) {
             logger.error("error", e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index 0e29a0c..19fc87f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -109,7 +109,8 @@ public class KafkaInputAnalyzer extends AbstractApplication {
                         offset++;
                         consumeMsgCount++;
 
-                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
+                        streamingMessage.setOffset(messageAndOffset.offset());
                         if (streamingParser.filter(streamingMessage)) {
                             streamQueue.add(streamingMessage);
                         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd116a6c/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index f506999..a2984b6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -162,7 +162,8 @@ public final class KafkaUtils {
         final ByteBuffer payload = messageAndOffset.message().payload();
         byte[] bytes = new byte[payload.limit()];
         payload.get(bytes);
-        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
+        streamingMessage.setOffset(messageAndOffset.offset());
         logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, streamingMessage.getTimestamp()));
         return streamingMessage.getTimestamp();
 


[3/3] kylin git commit: KYLIN-1726 Scalable streaming cubing

Posted by sh...@apache.org.
KYLIN-1726 Scalable streaming cubing

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a9c75eae
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a9c75eae
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a9c75eae

Branch: refs/heads/stream_m1
Commit: a9c75eaefc0cce4e1073cf4b90a830c6876d599b
Parents: cd116a6
Author: shaofengshi <sh...@apache.org>
Authored: Sun Jul 3 21:43:16 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Jul 3 21:43:16 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/DeployUtil.java   | 11 ++++----
 .../kylin/job/streaming/KafkaDataLoader.java    | 11 ++++++--
 .../kylin/job/streaming/StreamDataLoader.java   | 22 +++++++++++++++
 .../java/org/apache/kylin/source/ISource.java   |  4 +++
 .../org/apache/kylin/source/SourceFactory.java  |  5 ++++
 .../kylin/engine/mr/BatchCubingJobBuilder.java  |  1 +
 .../kylin/engine/mr/JobBuilderSupport.java      |  2 +-
 .../engine/mr/common/AbstractHadoopJob.java     | 28 ++++++++++++++++++++
 .../apache/kylin/engine/mr/steps/CuboidJob.java |  1 +
 .../engine/mr/steps/FactDistinctColumnsJob.java |  3 +++
 .../kylin/engine/mr/steps/InMemCuboidJob.java   |  1 +
 .../test_streaming_table_cube_desc.json         |  3 +++
 .../kafka/default.streaming_table.json          |  2 +-
 .../streaming/default.streaming_table.json      |  2 +-
 .../table/DEFAULT.STREAMING_TABLE.json          |  3 ++-
 .../kylin/provision/BuildCubeWithStream.java    |  3 ++-
 .../apache/kylin/source/hive/HiveSource.java    |  8 ++++++
 17 files changed, 97 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index d56dd64..5357faa 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -39,7 +39,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.job.dataGen.FactTableGenerator;
-import org.apache.kylin.job.streaming.KafkaDataLoader;
+import org.apache.kylin.job.streaming.StreamDataLoader;
 import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
@@ -48,7 +48,6 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.hive.HiveClient;
 import org.apache.kylin.source.hive.HiveCmdBuilder;
 import org.apache.kylin.source.kafka.TimedJsonStreamParser;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.maven.model.Model;
 import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
 import org.slf4j.Logger;
@@ -150,15 +149,15 @@ public class DeployUtil {
         deployHiveTables();
     }
 
-    public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, KafkaConfig kafkaConfig) throws IOException {
+    public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, StreamDataLoader streamDataLoader) throws IOException {
         CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
         List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
         List<String> data2 = StreamingTableDataGenerator.generate(10, endTime, endTime + 300000, cubeInstance.getFactTable());
         TableDesc tableDesc = cubeInstance.getFactTableDesc();
         //load into kafka
-        KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data);
-        KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data2);
-        logger.info("Write {} messages into topic {}", data.size(), kafkaConfig.getTopic());
+        streamDataLoader.loadIntoKafka(data);
+        streamDataLoader.loadIntoKafka(data2);
+        logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString());
 
         //csv data for H2 use
         List<TblColRef> tableColumns = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index 31fc670..756ef36 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -34,13 +34,20 @@ import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
 
 /**
  * Load prepared data into kafka(for test use)
  */
-public class KafkaDataLoader {
+public class KafkaDataLoader extends StreamDataLoader {
+    List<KafkaClusterConfig> kafkaClusterConfigs;
 
-    public static void loadIntoKafka(List<KafkaClusterConfig> kafkaClusterConfigs, List<String> messages) {
+    public KafkaDataLoader(KafkaConfig kafkaConfig) {
+        super(kafkaConfig);
+        this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs();
+    }
+
+    public void loadIntoKafka(List<String> messages) {
 
         KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
         String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java
new file mode 100644
index 0000000..50fc883
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java
@@ -0,0 +1,22 @@
+package org.apache.kylin.job.streaming;
+
+import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+
+import java.util.List;
+
+/**
+ */
+public abstract class StreamDataLoader {
+    protected KafkaConfig kafkaConfig;
+    public StreamDataLoader(KafkaConfig kafkaConfig) {
+        this.kafkaConfig = kafkaConfig;
+    }
+
+    abstract public void loadIntoKafka(List<String> messages);
+
+    @Override
+    public String toString() {
+        return "kafka topic " + kafkaConfig.getTopic();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index 3cd8a02..e9216f9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -20,9 +20,13 @@ package org.apache.kylin.source;
 
 import org.apache.kylin.metadata.model.TableDesc;
 
+import java.util.List;
+
 public interface ISource {
 
     public <I> I adaptToBuildEngine(Class<I> engineInterface);
 
     public ReadableTable createReadableTable(TableDesc tableDesc);
+
+    public List<String> getMRDependentResources(TableDesc table);
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
index f701a0f..e82c6ed 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.source;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
@@ -45,4 +46,8 @@ public class SourceFactory {
         return tableSource(table).adaptToBuildEngine(engineInterface);
     }
 
+    public static List<String> getMRDependentResources(TableDesc table) {
+        return tableSource(table).getMRDependentResources(table);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index ec9b1c6..5a098a8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -99,6 +99,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]);
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
         baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index d7676f1..830de9e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -76,7 +76,7 @@ public class JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId));
         appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
-
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
         result.setMapReduceParams(cmd.toString());
         return result;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 5472928..26592cd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -29,6 +29,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.regex.Matcher;
@@ -68,6 +69,7 @@ import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.SourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -166,6 +168,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
 
         String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
         String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency");
+        String kylinKafkaDependency = System.getProperty("kylin.kafka.dependency");
         logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
 
         Configuration jobConf = job.getConfiguration();
@@ -223,6 +226,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             }
         }
 
+        // for hive dependencies
+        if (kylinKafkaDependency != null) {
+            kylinKafkaDependency = kylinKafkaDependency.replace(":", ",");
+
+            logger.info("Kafka Dependencies Before Filtered: " + kylinHiveDependency);
+
+            if (kylinDependency.length() > 0)
+                kylinDependency.append(",");
+            kylinDependency.append(kylinKafkaDependency);
+        } else {
+
+            logger.info("No Kafka dependency jars set in the environment, will find them from jvm:");
+
+            try {
+                String kafkaClientJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer"));
+                kylinDependency.append(kafkaClientJarPath).append(",");
+                logger.info("kafka jar file: " + kafkaClientJarPath);
+
+            } catch (ClassNotFoundException e) {
+                logger.error("Cannot found kafka dependency jars: " + e);
+            }
+        }
+
         // for KylinJobMRLibDir
         String mrLibDir = kylinConf.getKylinJobMRLibDir();
         if (!StringUtils.isBlank(mrLibDir)) {
@@ -433,6 +459,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         for (String tableName : cube.getDescriptor().getModel().getAllTables()) {
             TableDesc table = metaMgr.getTableDesc(tableName);
             dumpList.add(table.getResourcePath());
+            List<String> dependentResources = SourceFactory.getMRDependentResources(table);
+            dumpList.addAll(dependentResources);
         }
         for (CubeSegment segment : cube.getSegments()) {
             dumpList.addAll(segment.getDictionaryPaths());

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index 85ae9c7..d1893f4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -109,6 +109,7 @@ public class CuboidJob extends AbstractHadoopJob {
             }
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId);
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 90253ba..1bbe893 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -55,6 +55,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
         try {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_CUBING_JOB_ID);
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_SEGMENT_NAME);
             options.addOption(OPTION_STATISTICS_ENABLED);
@@ -63,6 +64,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             parseOptions(options, args);
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String job_id = getOptionValue(OPTION_CUBING_JOB_ID);
+            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id);
             String cubeName = getOptionValue(OPTION_CUBE_NAME);
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 4b2ff37..62964bb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -97,6 +97,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             }
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId);
             logger.info("Starting: " + job.getJobName());
             
             setJobClasspath(job, cube.getConfig());

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
index 0267db5..23e5b00 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
@@ -105,6 +105,9 @@
       "joint_dims" : [ ]
     }
   } ],
+  "override_kylin_properties": {
+    "kylin.cube.algorithm": "random"
+  },
   "notify_list" : [ ],
   "status_need_notify" : [ ],
   "auto_merge_time_ranges" : null,

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/examples/test_case_data/localmeta/kafka/default.streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/default.streaming_table.json b/examples/test_case_data/localmeta/kafka/default.streaming_table.json
index c97927d..6a64cce 100644
--- a/examples/test_case_data/localmeta/kafka/default.streaming_table.json
+++ b/examples/test_case_data/localmeta/kafka/default.streaming_table.json
@@ -1,7 +1,7 @@
 {
  
   "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
-  "name": "default.streaming_table",
+  "name": "DEFAULT.STREAMING_TABLE",
   "topic": "test_streaming_table_topic_xyz",
   "timeout": 60000,
   "bufferSize": 65536,

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/examples/test_case_data/localmeta/streaming/default.streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/default.streaming_table.json b/examples/test_case_data/localmeta/streaming/default.streaming_table.json
index 6eb4a88..85a477b 100644
--- a/examples/test_case_data/localmeta/streaming/default.streaming_table.json
+++ b/examples/test_case_data/localmeta/streaming/default.streaming_table.json
@@ -1,6 +1,6 @@
 {
   "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
-  "name": "default.streaming_table",
+  "name": "DEFAULT.STREAMING_TABLE",
   "type": "kafka",
   "last_modified": 0
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
index 6ead441..bba4b8b 100644
--- a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
@@ -40,5 +40,6 @@
     }
   ],
   "database": "DEFAULT",
-  "last_modified": 0
+  "last_modified": 0,
+  "source_type" : 1
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index c426ea4..314276e 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -33,6 +33,7 @@ import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.job.streaming.KafkaDataLoader;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -98,7 +99,7 @@ public class BuildCubeWithStream {
         streamingConfig.setTopic(UUID.randomUUID().toString());
         KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig);
 
-        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, streamingConfig);
+        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, new KafkaDataLoader(streamingConfig));
     }
 
     public static void afterClass() throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index b7dbff0..e9cebea 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -18,11 +18,14 @@
 
 package org.apache.kylin.source.hive;
 
+import com.google.common.collect.Lists;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ReadableTable;
 
+import java.util.List;
+
 //used by reflection
 public class HiveSource implements ISource {
 
@@ -41,4 +44,9 @@ public class HiveSource implements ISource {
         return new HiveTable(tableDesc);
     }
 
+    @Override
+    public List<String> getMRDependentResources(TableDesc table) {
+        return Lists.newArrayList();
+    }
+
 }