You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/23 10:15:02 UTC

[18/50] [abbrv] kylin git commit: initial commit for KYLIN-1431

initial commit for KYLIN-1431


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9021f17d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9021f17d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9021f17d

Branch: refs/heads/1.5.x-HBase1.1.3
Commit: 9021f17d85be01bf34b48a7a31be82f53ceb9c8f
Parents: 0ec3ed0
Author: shaofengshi <sh...@apache.org>
Authored: Wed Mar 2 11:16:46 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 15:07:04 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/CubeMetaExtractor.java |   2 +-
 build/bin/streaming_build.sh                    |   4 +-
 build/bin/streaming_fillgap.sh                  |   5 +-
 .../kylin/metadata/model/ISourceAware.java      |   1 +
 .../kylin/engine/streaming/BootstrapConfig.java |  20 +-
 .../kylin/engine/streaming/IStreamingInput.java |   3 +-
 .../streaming/OneOffStreamingBuilder.java       |  17 +-
 .../kylin/engine/streaming/StreamingConfig.java |  33 +--
 .../engine/streaming/StreamingManager.java      |  12 +
 .../engine/streaming/cli/StreamingCLI.java      |  21 +-
 .../engine/streaming/util/StreamingUtils.java   |  18 +-
 .../kafka/default.streaming_table.json          |  21 ++
 .../localmeta/kafka/kafka_test.json             |  20 --
 .../kafka/test_streaming_table_cube.json        |  22 --
 .../kafka/test_streaming_table_ii.json          |  22 --
 .../streaming/default.streaming_table.json      |   6 +
 .../localmeta/streaming/kafka_test.json         |  20 --
 .../streaming/test_streaming_table_cube.json    |   8 -
 .../streaming/test_streaming_table_ii.json      |   8 -
 .../kylin/provision/BuildCubeWithStream.java    |  16 +-
 .../kylin/rest/controller/CubeController.java   | 234 -------------------
 .../rest/controller/StreamingController.java    |   4 +-
 .../kylin/rest/service/StreamingService.java    |  18 +-
 .../kylin/source/kafka/KafkaStreamingInput.java |  78 ++++---
 .../kylin/source/kafka/StreamingParser.java     |   6 +-
 25 files changed, 163 insertions(+), 456 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
index 527ef0a..ef27ade 100644
--- a/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
+++ b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
@@ -227,7 +227,7 @@ public class CubeMetaExtractor extends AbstractApplication {
 
     private void dealWithStreaming(CubeInstance cube) {
         for (StreamingConfig streamingConfig : streamingManager.listAllStreaming()) {
-            if (streamingConfig.getCubeName() != null && streamingConfig.getCubeName().equalsIgnoreCase(cube.getName())) {
+            if (streamingConfig.getName() != null && streamingConfig.getName().equalsIgnoreCase(cube.getFactTable())) {
                 requiredResources.add(StreamingConfig.concatResourcePath(streamingConfig.getName()));
                 requiredResources.add(KafkaConfig.concatResourcePath(streamingConfig.getName()));
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/build/bin/streaming_build.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh
index a96ecc1..ed19036 100644
--- a/build/bin/streaming_build.sh
+++ b/build/bin/streaming_build.sh
@@ -20,7 +20,7 @@
 source /etc/profile
 source ~/.bash_profile
 
-STREAMING=$1
+CUBE=$1
 INTERVAL=$2
 DELAY=$3
 CURRENT_TIME_IN_SECOND=`date +%s`
@@ -30,4 +30,4 @@ END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL))
 
 ID="$START"_"$END"
 echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${STREAMING} ${ID} -start ${START} -end ${END} -streaming ${STREAMING}
\ No newline at end of file
+sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${CUBE} ${ID} -start ${START} -end ${END} -cube ${CUBE}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/build/bin/streaming_fillgap.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh
index 74d9037..c67809a 100644
--- a/build/bin/streaming_fillgap.sh
+++ b/build/bin/streaming_fillgap.sh
@@ -20,8 +20,7 @@
 source /etc/profile
 source ~/.bash_profile
 
-streaming=$1
-margin=$2
+cube=$1
 
 cd ${KYLIN_HOME}
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${streaming} fillgap -streaming ${streaming} -fillGap true -margin ${margin}
\ No newline at end of file
+sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${cube} fillgap -cube ${cube} -fillGap true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
index 3d89f40..8cfda15 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.model;
 public interface ISourceAware {
 
     public static final int ID_HIVE = 0;
+    public static final int ID_STREAMING = 1;
     public static final int ID_SPARKSQL = 5;
 
     int getSourceType();

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
index a3e2db5..a4c4618 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
@@ -4,9 +4,7 @@ package org.apache.kylin.engine.streaming;
  */
 public class BootstrapConfig {
 
-    private String streaming;
-    private int partitionId = -1;
-
+    private String cubeName;
     private long start = 0L;
     private long end = 0L;
 
@@ -28,20 +26,12 @@ public class BootstrapConfig {
         this.end = end;
     }
 
-    public String getStreaming() {
-        return streaming;
-    }
-
-    public void setStreaming(String streaming) {
-        this.streaming = streaming;
-    }
-
-    public int getPartitionId() {
-        return partitionId;
+    public String getCubeName() {
+        return cubeName;
     }
 
-    public void setPartitionId(int partitionId) {
-        this.partitionId = partitionId;
+    public void setCubeName(String cubeName) {
+        this.cubeName = cubeName;
     }
 
     public boolean isFillGap() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
index 1cf3d98..4b4cf02 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
@@ -34,11 +34,12 @@
 package org.apache.kylin.engine.streaming;
 
 import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.metadata.realization.RealizationType;
 
 /**
  */
 public interface IStreamingInput {
 
-    StreamingBatch getBatchWithTimeWindow(String streamingConfig, int id, long startTime, long endTime);
+    StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime);
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
index 3fbade2..6bad000 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
@@ -43,6 +43,7 @@ import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Preconditions;
+import org.apache.kylin.metadata.realization.RealizationType;
 
 /**
  */
@@ -53,23 +54,25 @@ public class OneOffStreamingBuilder {
     private final StreamingBatchBuilder streamingBatchBuilder;
     private final long startTime;
     private final long endTime;
-    private final String streamingConfig;
+    private final RealizationType realizationType;
+    private final String realizationName;
 
-    public OneOffStreamingBuilder(String streamingConfig, long startTime, long endTime) {
+    public OneOffStreamingBuilder(RealizationType realizationType, String realizationName, long startTime, long endTime) {
         Preconditions.checkArgument(startTime < endTime);
         this.startTime = startTime;
         this.endTime = endTime;
-        this.streamingConfig = Preconditions.checkNotNull(streamingConfig);
-        this.streamingInput = Preconditions.checkNotNull(StreamingUtils.getStreamingInput(streamingConfig));
-        this.streamingOutput = Preconditions.checkNotNull(StreamingUtils.getStreamingOutput(streamingConfig));
-        this.streamingBatchBuilder = Preconditions.checkNotNull(StreamingUtils.getMicroBatchBuilder(streamingConfig));
+        this.realizationType = Preconditions.checkNotNull(realizationType);
+        this.realizationName = Preconditions.checkNotNull(realizationName);
+        this.streamingInput = Preconditions.checkNotNull(StreamingUtils.getStreamingInput());
+        this.streamingOutput = Preconditions.checkNotNull(StreamingUtils.getStreamingOutput());
+        this.streamingBatchBuilder = Preconditions.checkNotNull(StreamingUtils.getMicroBatchBuilder(realizationType, realizationName));
     }
 
     public Runnable build() {
         return new Runnable() {
             @Override
             public void run() {
-                StreamingBatch streamingBatch = streamingInput.getBatchWithTimeWindow(streamingConfig, -1, startTime, endTime);
+                StreamingBatch streamingBatch = streamingInput.getBatchWithTimeWindow(realizationType, realizationName, -1, startTime, endTime);
                 final IBuildable buildable = streamingBatchBuilder.createBuildable(streamingBatch);
                 final Map<Long, HyperLogLogPlusCounter> samplingResult = streamingBatchBuilder.sampling(streamingBatch);
                 final Map<TblColRef, Dictionary<String>> dictionaryMap = streamingBatchBuilder.buildDictionary(streamingBatch, buildable);

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
index f0a7ab1..c8d1911 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
@@ -55,29 +55,24 @@ public class StreamingConfig extends RootPersistentEntity {
 
     public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
 
+    public static final String STREAMING_TYPE_KAFKA = "kafka";
+
     @JsonProperty("name")
     private String name;
 
-    @JsonProperty("iiName")
-    private String iiName;
-
-    @JsonProperty("cubeName")
-    private String cubeName;
+    @JsonProperty("type")
+    private String type = STREAMING_TYPE_KAFKA;
 
-    public String getCubeName() {
-        return cubeName;
+    public String getType() {
+        return type;
     }
 
-    public void setCubeName(String cubeName) {
-        this.cubeName = cubeName;
+    public void setType(String type) {
+        this.type = type;
     }
 
-    public String getIiName() {
-        return iiName;
-    }
-
-    public void setIiName(String iiName) {
-        this.iiName = iiName;
+    public String getResourcePath() {
+        return concatResourcePath(name);
     }
 
     public String getName() {
@@ -88,12 +83,8 @@ public class StreamingConfig extends RootPersistentEntity {
         this.name = name;
     }
 
-    public String getResourcePath() {
-        return concatResourcePath(name);
-    }
-
-    public static String concatResourcePath(String streamingName) {
-        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + streamingName + ".json";
+    public static String concatResourcePath(String name) {
+        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index e0b086d..f652762 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -104,6 +104,18 @@ public class StreamingManager {
         }
     }
 
+    private static String formatStreamingConfigPath(String name) {
+        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
+    }
+
+    private static String formatStreamingOutputPath(String streaming, int partition) {
+        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
+    }
+
+    private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
+        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
+    }
+
     public StreamingConfig getStreamingConfig(String name) {
         return streamingMap.get(name);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index a73a6ac..0bab396 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -44,6 +44,7 @@ import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
+import org.apache.kylin.metadata.realization.RealizationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +56,7 @@ public class StreamingCLI {
 
     public static void main(String[] args) {
         try {
-            Preconditions.checkArgument(args[0].equals("streaming"));
+            Preconditions.checkArgument(args[0].equals("cube"));
             Preconditions.checkArgument(args[1].equals("start"));
 
             int i = 2;
@@ -69,11 +70,8 @@ public class StreamingCLI {
                 case "-end":
                     bootstrapConfig.setEnd(Long.parseLong(args[++i]));
                     break;
-                case "-streaming":
-                    bootstrapConfig.setStreaming(args[++i]);
-                    break;
-                case "-partition":
-                    bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
+                case "-cube":
+                    bootstrapConfig.setCubeName(args[++i]);
                     break;
                 case "-fillGap":
                     bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
@@ -84,14 +82,13 @@ public class StreamingCLI {
                 i++;
             }
             if (bootstrapConfig.isFillGap()) {
-                final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(bootstrapConfig.getStreaming());
-                final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+                final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(bootstrapConfig.getCubeName());
                 logger.info("all gaps:" + StringUtils.join(gaps, ","));
                 for (Pair<Long, Long> gap : gaps) {
-                    startOneOffCubeStreaming(bootstrapConfig.getStreaming(), gap.getFirst(), gap.getSecond());
+                    startOneOffCubeStreaming(bootstrapConfig.getCubeName(), gap.getFirst(), gap.getSecond());
                 }
             } else {
-                startOneOffCubeStreaming(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
+                startOneOffCubeStreaming(bootstrapConfig.getCubeName(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
                 logger.info("streaming process finished, exit with 0");
                 System.exit(0);
             }
@@ -102,8 +99,8 @@ public class StreamingCLI {
         }
     }
     
-    private static void startOneOffCubeStreaming(String streaming, long start, long end) {
-        final Runnable runnable = new OneOffStreamingBuilder(streaming, start, end).build();
+    private static void startOneOffCubeStreaming(String cubeName, long start, long end) {
+        final Runnable runnable = new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, end).build();
         runnable.run();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
index 0ae7143..66a0af2 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
@@ -43,29 +43,27 @@ import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.engine.streaming.cube.StreamingCubeBuilder;
 
 import com.google.common.base.Preconditions;
+import org.apache.kylin.metadata.realization.RealizationType;
 
 /**
  * TODO: like MRUtil, use Factory pattern to allow config
  */
 public class StreamingUtils {
 
-    public static IStreamingInput getStreamingInput(String streaming) {
+    public static IStreamingInput getStreamingInput() {
         return (IStreamingInput) ClassUtil.newInstance("org.apache.kylin.source.kafka.KafkaStreamingInput");
     }
 
-    public static IStreamingOutput getStreamingOutput(String streaming) {
+    public static IStreamingOutput getStreamingOutput() {
         return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput");
     }
 
-    public static StreamingBatchBuilder getMicroBatchBuilder(String streaming) {
-        final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streaming);
-        Preconditions.checkNotNull(streamingConfig);
-        if (streamingConfig.getCubeName() != null) {
-            return new StreamingCubeBuilder(streamingConfig.getCubeName());
-        } else if (streamingConfig.getIiName() != null) {
-            throw new UnsupportedOperationException("not implemented yet");
+    public static StreamingBatchBuilder getMicroBatchBuilder(RealizationType realizationType, String realizationName) {
+        Preconditions.checkNotNull(realizationName);
+        if (realizationType == RealizationType.CUBE) {
+            return new StreamingCubeBuilder(realizationName);
         } else {
-            throw new UnsupportedOperationException("StreamingConfig is not valid");
+            throw new UnsupportedOperationException("not implemented yet");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/kafka/default.streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/default.streaming_table.json b/examples/test_case_data/localmeta/kafka/default.streaming_table.json
new file mode 100644
index 0000000..c99b8e5
--- /dev/null
+++ b/examples/test_case_data/localmeta/kafka/default.streaming_table.json
@@ -0,0 +1,21 @@
+{
+  "version":"2.1",
+  "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
+  "name": "default.streaming_table",
+  "topic": "test_streaming_table_topic_xyz",
+  "timeout": 60000,
+  "bufferSize": 65536,
+  "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser",
+  "last_modified": 0,
+  "clusters": [
+    {
+      "brokers": [
+        {
+          "id": 0,
+          "host": "sandbox",
+          "port": 6667
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/kafka/kafka_test.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/kafka_test.json b/examples/test_case_data/localmeta/kafka/kafka_test.json
deleted file mode 100644
index a20f71e..0000000
--- a/examples/test_case_data/localmeta/kafka/kafka_test.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
-  "version":"2.1",
-  "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec6c3c0d",
-  "name": "kafka_test",
-  "topic": "kafka_stream_test",
-  "timeout": 60000,
-  "bufferSize": 65536,
-  "last_modified": 0,
-  "clusters": [
-    {
-      "brokers": [
-        {
-          "id": 0,
-          "host": "sandbox.hortonworks.com",
-          "port": 6667
-        }
-      ]
-    }
-  ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json b/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json
deleted file mode 100644
index 554fa62..0000000
--- a/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
-  "version":"2.1",
-  "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
-  "name": "test_streaming_table_cube",
-  "topic": "test_streaming_table_topic_xyz",
-  "timeout": 60000,
-  "bufferSize": 65536,
-  "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser",
-  "partition": 1,
-  "last_modified": 0,
-  "clusters": [
-    {
-      "brokers": [
-        {
-          "id": 0,
-          "host": "sandbox",
-          "port": 6667
-        }
-      ]
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json b/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json
deleted file mode 100644
index b6f18c7..0000000
--- a/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
-  "version":"2.1",
-  "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec909322",
-  "name": "test_streaming_table_ii",
-  "topic": "test_streaming_table_topic_xyz",
-  "timeout": 60000,
-  "bufferSize": 65536,
-  "parserName": "org.apache.kylin.source.kafka.JsonStreamParser",
-  "partition": 1,
-  "last_modified": 0,
-  "clusters": [
-    {
-      "brokers": [
-        {
-          "id": 0,
-          "host": "sandbox",
-          "port": 6667
-        }
-      ]
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/streaming/default.streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/default.streaming_table.json b/examples/test_case_data/localmeta/streaming/default.streaming_table.json
new file mode 100644
index 0000000..6eb4a88
--- /dev/null
+++ b/examples/test_case_data/localmeta/streaming/default.streaming_table.json
@@ -0,0 +1,6 @@
+{
+  "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
+  "name": "default.streaming_table",
+  "type": "kafka",
+  "last_modified": 0
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/streaming/kafka_test.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/kafka_test.json b/examples/test_case_data/localmeta/streaming/kafka_test.json
deleted file mode 100644
index a20f71e..0000000
--- a/examples/test_case_data/localmeta/streaming/kafka_test.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
-  "version":"2.1",
-  "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec6c3c0d",
-  "name": "kafka_test",
-  "topic": "kafka_stream_test",
-  "timeout": 60000,
-  "bufferSize": 65536,
-  "last_modified": 0,
-  "clusters": [
-    {
-      "brokers": [
-        {
-          "id": 0,
-          "host": "sandbox.hortonworks.com",
-          "port": 6667
-        }
-      ]
-    }
-  ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json b/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
deleted file mode 100644
index ecf0511..0000000
--- a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
+++ /dev/null
@@ -1,8 +0,0 @@
-{
-  "version":"2.1",
-  "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
-  "name": "test_streaming_table_cube",
-  "cubeName": "test_streaming_table_cube",
-  "partition": 1,
-  "last_modified": 0
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json b/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
deleted file mode 100644
index 022ab70..0000000
--- a/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
+++ /dev/null
@@ -1,8 +0,0 @@
-{
-  "version":"2.1",
-  "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec909322",
-  "name": "test_streaming_table_ii",
-  "iiName": "test_streaming_table_ii",
-  "partition": 1,
-  "last_modified": 0
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 27226e7..eeff999 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -27,10 +27,13 @@ import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
@@ -42,7 +45,7 @@ import org.slf4j.LoggerFactory;
 public class BuildCubeWithStream {
 
     private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream.class);
-    private static final String streamingName = "test_streaming_table_cube";
+    private static final String cubeName = "test_streaming_table_cube";
     private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00");
     private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00");
     private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours
@@ -75,15 +78,16 @@ public class BuildCubeWithStream {
         DeployUtil.overrideJobJarLocations();
 
         kylinConfig = KylinConfig.getInstanceFromEnv();
-
-        final StreamingConfig config = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName);
+        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        final String factTable = cubeInstance.getFactTable();
+        final StreamingConfig config = StreamingManager.getInstance(kylinConfig).getStreamingConfig(factTable);
 
         //Use a random topic for kafka data stream
-        KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingName);
+        KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(config.getName());
         streamingConfig.setTopic(UUID.randomUUID().toString());
         KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig);
 
-        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, config.getCubeName(), streamingConfig);
+        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, streamingConfig);
     }
 
     public static void afterClass() throws Exception {
@@ -94,7 +98,7 @@ public class BuildCubeWithStream {
         logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval));
         for (long start = startTime; start < endTime; start += batchInterval) {
             logger.info(String.format("build batch:{%d, %d}", start, start + batchInterval));
-            new OneOffStreamingBuilder(streamingName, start, start + batchInterval).build().run();
+            new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, start + batchInterval).build().run();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 9afa750..e60f330 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -284,92 +284,6 @@ public class CubeController extends BasicController {
             throw new InternalErrorException("Failed to clone cube ", e);
         }
 
-        boolean isStreamingCube = false, cloneStreamingConfigSuccess = false, cloneKafkaConfigSuccess = false;
-
-
-        List<StreamingConfig> streamingConfigs = null;
-        try {
-            streamingConfigs = streamingService.listAllStreamingConfigs(cubeName);
-            if (streamingConfigs.size() != 0) {
-                isStreamingCube = true;
-            }
-
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        StreamingConfig newStreamingConfig = null;
-        KafkaConfig newKafkaConfig = null;
-
-        try {
-
-            if (isStreamingCube) {
-
-                isStreamingCube = true;
-                newStreamingConfig = streamingConfigs.get(0).clone();
-                newStreamingConfig.setName(newCubeName + "_STREAMING");
-                newStreamingConfig.updateRandomUuid();
-                newStreamingConfig.setLastModified(0);
-                newStreamingConfig.setCubeName(newCubeName);
-                try {
-                    streamingService.createStreamingConfig(newStreamingConfig);
-                    cloneStreamingConfigSuccess = true;
-                } catch (IOException e) {
-                    throw new InternalErrorException("Failed to clone streaming config. ", e);
-                }
-
-                //StreamingConfig name and KafkaConfig name is the same for same cube
-                String kafkaConfigName = streamingConfigs.get(0).getName();
-                KafkaConfig kafkaConfig = null;
-                try {
-                    kafkaConfig = kafkaConfigService.getKafkaConfig(kafkaConfigName);
-                    if (kafkaConfig != null) {
-                        newKafkaConfig = kafkaConfig.clone();
-                        newKafkaConfig.setName(newStreamingConfig.getName());
-                        newKafkaConfig.setLastModified(0);
-                        newKafkaConfig.updateRandomUuid();
-                    }
-                } catch (IOException e) {
-                    throw new InternalErrorException("Failed to get kafka config info. ", e);
-                }
-
-                try {
-                    kafkaConfigService.createKafkaConfig(newKafkaConfig);
-                    cloneKafkaConfigSuccess = true;
-                } catch (IOException e) {
-                    throw new InternalErrorException("Failed to clone streaming config. ", e);
-                }
-            }
-        } finally {
-
-            //rollback if failed
-            if (isStreamingCube) {
-                if (cloneStreamingConfigSuccess == false || cloneKafkaConfigSuccess == false) {
-                    try {
-                        cubeService.deleteCube(newCube);
-                    } catch (Exception ex) {
-                        throw new InternalErrorException("Failed, and failed to rollback on delete cube. " + " Caused by: " + ex.getMessage(), ex);
-                    }
-                    if (cloneStreamingConfigSuccess == true) {
-                        try {
-                            streamingService.dropStreamingConfig(newStreamingConfig);
-                        } catch (IOException e) {
-                            throw new InternalErrorException("Failed to clone cube, and StreamingConfig created and failed to delete: " + e.getLocalizedMessage());
-                        }
-                    }
-                    if (cloneKafkaConfigSuccess == true) {
-                        try {
-                            kafkaConfigService.dropKafkaConfig(newKafkaConfig);
-                        } catch (IOException e) {
-                            throw new InternalErrorException("Failed to clone cube, and KafkaConfig created and failed to delete: " + e.getLocalizedMessage());
-                        }
-                    }
-
-                }
-
-            }
-        }
-
         return newCube;
 
     }
@@ -400,27 +314,6 @@ public class CubeController extends BasicController {
             throw new NotFoundException("Cube with name " + cubeName + " not found..");
         }
 
-        //drop related StreamingConfig KafkaConfig if exist
-        try {
-            List<StreamingConfig> configs = streamingService.listAllStreamingConfigs(cubeName);
-            for (StreamingConfig config : configs) {
-                try {
-                    streamingService.dropStreamingConfig(config);
-                } catch (IOException e) {
-                    logger.error(e.getLocalizedMessage(), e);
-                    throw new InternalErrorException("Failed to delete StreamingConfig. " + " Caused by: " + e.getMessage(), e);
-                }
-                try {
-                    KafkaConfig kfkConfig = kafkaConfigService.getKafkaConfig(config.getName());
-                    kafkaConfigService.dropKafkaConfig(kfkConfig);
-                } catch (IOException e) {
-                    throw new InternalErrorException("Failed to delete KafkaConfig. " + " Caused by: " + e.getMessage(), e);
-                }
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
         //drop Cube
         try {
             cubeService.deleteCube(cube);
@@ -587,133 +480,6 @@ public class CubeController extends BasicController {
             return cubeRequest;
         }
 
-        boolean updateStreamingConfigSuccess = false, updateKafkaConfigSuccess = false;
-
-        boolean isStreamingCube = cubeRequest.getStreamingCube() != null && cubeRequest.getStreamingCube().equals("true");
-
-        //oldConfig is for recover use
-        StreamingConfig streamingConfig = null, oldStreamingConfig = null;
-        KafkaConfig kafkaConfig = null, oldKafkaConfig = null;
-        if (isStreamingCube) {
-            streamingConfig = deserializeStreamingDesc(cubeRequest);
-            kafkaConfig = deserializeKafkaDesc(cubeRequest);
-            try {
-                oldKafkaConfig = kafkaConfigService.getKafkaConfig(kafkaConfig.getName());
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-            oldStreamingConfig = streamingService.getStreamingManager().getStreamingConfig(streamingConfig.getName());
-        }
-        try {
-            //streaming Cube
-            if (isStreamingCube) {
-                if (streamingConfig == null) {
-                    cubeRequest.setMessage("No StreamingConfig info to update.");
-                    return cubeRequest;
-                }
-                if (kafkaConfig == null) {
-                    cubeRequest.setMessage("No KafkaConfig info to update.");
-                    return cubeRequest;
-                }
-
-                if (oldStreamingConfig == null) {
-                    streamingConfig.setUuid(UUID.randomUUID().toString());
-                    try {
-                        streamingService.createStreamingConfig(streamingConfig);
-                        updateStreamingConfigSuccess = true;
-                    } catch (IOException e) {
-                        logger.error("Failed to add StreamingConfig:" + e.getLocalizedMessage(), e);
-                        throw new InternalErrorException("Failed to add StreamingConfig: " + e.getLocalizedMessage());
-                    }
-                } else {
-                    try {
-                        streamingConfig = streamingService.updateStreamingConfig(streamingConfig);
-                        updateStreamingConfigSuccess = true;
-
-                    } catch (IOException e) {
-                        logger.error("Failed to update StreamingConfig:" + e.getLocalizedMessage(), e);
-                        throw new InternalErrorException("Failed to update StreamingConfig: " + e.getLocalizedMessage());
-                    }
-                }
-                if (oldKafkaConfig == null) {
-                    kafkaConfig.setUuid(UUID.randomUUID().toString());
-                    try {
-                        kafkaConfigService.createKafkaConfig(kafkaConfig);
-                        updateKafkaConfigSuccess = true;
-                    } catch (IOException e) {
-                        logger.error("Failed to add KafkaConfig:" + e.getLocalizedMessage(), e);
-                        throw new InternalErrorException("Failed to add KafkaConfig: " + e.getLocalizedMessage());
-                    }
-
-                } else {
-                    try {
-                        kafkaConfig = kafkaConfigService.updateKafkaConfig(kafkaConfig);
-                        updateKafkaConfigSuccess = true;
-                    } catch (IOException e) {
-                        logger.error("Failed to update KafkaConfig:" + e.getLocalizedMessage(), e);
-                        throw new InternalErrorException("Failed to update KafkaConfig: " + e.getLocalizedMessage());
-                    }
-                }
-
-            }
-        } finally {
-            if (isStreamingCube) {
-                //recover cube desc
-                if (updateStreamingConfigSuccess == false || updateKafkaConfigSuccess == false) {
-                    oldCubeDesc.setLastModified(desc.getLastModified());
-                    CubeInstance cube = cubeService.getCubeManager().getCube(cubeRequest.getCubeName());
-                    try {
-                        desc = cubeService.updateCubeAndDesc(cube, oldCubeDesc, projectName);
-                    } catch (Exception e) {
-                        logger.error("Failed to recover CubeDesc:" + e.getLocalizedMessage(), e);
-                        throw new InternalErrorException("Failed to recover CubeDesc: " + e.getLocalizedMessage());
-                    }
-
-                    if (updateStreamingConfigSuccess == true) {
-
-                        if (oldStreamingConfig != null) {
-
-                            oldStreamingConfig.setLastModified(streamingConfig.getLastModified());
-                            try {
-                                streamingService.updateStreamingConfig(oldStreamingConfig);
-                            } catch (IOException e) {
-                                logger.error("Failed to recover StreamingConfig:" + e.getLocalizedMessage(), e);
-                                throw new InternalErrorException("Failed to recover StreamingConfig: " + e.getLocalizedMessage());
-                            }
-                        } else {
-                            try {
-                                streamingService.dropStreamingConfig(streamingConfig);
-                            } catch (IOException e) {
-                                logger.error("Failed to remove added StreamingConfig:" + e.getLocalizedMessage(), e);
-                                throw new InternalErrorException("Failed to remove added StreamingConfig: " + e.getLocalizedMessage());
-                            }
-                        }
-                    }
-
-                    if (updateKafkaConfigSuccess == true) {
-                        if (oldKafkaConfig != null) {
-                            oldKafkaConfig.setLastModified(kafkaConfig.getLastModified());
-                            try {
-                                kafkaConfigService.updateKafkaConfig(oldKafkaConfig);
-                            } catch (IOException e) {
-                                logger.error("Failed to recover KafkaConfig:" + e.getLocalizedMessage(), e);
-                                throw new InternalErrorException("Failed to recover KafkaConfig: " + e.getLocalizedMessage());
-                            }
-                        } else {
-                            try {
-                                kafkaConfigService.dropKafkaConfig(kafkaConfig);
-                            } catch (IOException e) {
-                                logger.error("Failed to remove added KafkaConfig:" + e.getLocalizedMessage(), e);
-                                throw new InternalErrorException("Failed to remove added KafkaConfig: " + e.getLocalizedMessage());
-                            }
-                        }
-                    }
-
-                }
-            }
-
-        }
-
         String descData = JsonUtil.writeValueAsIndentString(desc);
         cubeRequest.setCubeDescData(descData);
         cubeRequest.setSuccessful(true);

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index e22bd30..ecd7571 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -60,9 +60,9 @@ public class StreamingController extends BasicController {
 
     @RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
     @ResponseBody
-    public List<StreamingConfig> getStreamings(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
+    public List<StreamingConfig> getStreamings(@RequestParam(value = "table", required = false) String table, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
         try {
-            return streamingService.getStreamingConfigs(cubeName, limit, offset);
+            return streamingService.getStreamingConfigs(table, limit, offset);
         } catch (IOException e) {
             logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
             throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index e40426b..a0473e9 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.rest.service;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.rest.constant.Constant;
@@ -37,26 +39,22 @@ public class StreamingService extends BasicService {
     private AccessService accessService;
 
     @PostFilter(Constant.ACCESS_POST_FILTER_READ)
-    public List<StreamingConfig> listAllStreamingConfigs(final String cubeName) throws IOException {
+    public List<StreamingConfig> listAllStreamingConfigs(final String table) throws IOException {
         List<StreamingConfig> streamingConfigs = new ArrayList();
-        CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null;
-        if (null == cubeInstance) {
+        if (StringUtils.isEmpty(table)) {
             streamingConfigs = getStreamingManager().listAllStreaming();
         } else {
-            for(StreamingConfig config : getStreamingManager().listAllStreaming()){
-                if(cubeInstance.getName().equals(config.getCubeName())){
-                    streamingConfigs.add(config);
-                }
-            }
+            StreamingConfig config = getStreamingManager().getConfig(table);
+            streamingConfigs.add(config);
         }
 
         return streamingConfigs;
     }
 
-    public List<StreamingConfig> getStreamingConfigs(final String cubeName, final Integer limit, final Integer offset) throws IOException {
+    public List<StreamingConfig> getStreamingConfigs(final String table, final Integer limit, final Integer offset) throws IOException {
 
         List<StreamingConfig> streamingConfigs;
-        streamingConfigs = listAllStreamingConfigs(cubeName);
+        streamingConfigs = listAllStreamingConfigs(table);
 
         if (limit == null || offset == null) {
             return streamingConfigs;

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 2e262b3..c05119f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -47,9 +47,14 @@ import kafka.message.MessageAndOffset;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.streaming.IStreamingInput;
 import org.apache.kylin.common.util.StreamingBatch;
 import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.source.kafka.util.KafkaRequester;
@@ -65,39 +70,54 @@ public class KafkaStreamingInput implements IStreamingInput {
     private static final Logger logger = LoggerFactory.getLogger(KafkaStreamingInput.class);
 
     @Override
-    public StreamingBatch getBatchWithTimeWindow(String streaming, int id, long startTime, long endTime) {
-        try {
+    public StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime) {
+        if (realizationType != RealizationType.CUBE) {
+            throw new IllegalArgumentException("Unsupported realization in KafkaStreamingInput: " + realizationType);
+        }
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(realizationName);
+        final String streaming = cube.getFactTable();
+        final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
+        final StreamingConfig streamingConfig = streamingManager.getConfig(streaming);
+        if (streamingConfig == null) {
+            throw new IllegalArgumentException("Table " + streaming + " is not a streaming table.");
+        }
+        if (StreamingConfig.STREAMING_TYPE_KAFKA.equals(streamingConfig.getType())) {
             logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime));
-            final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
-            final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
-            final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig);
-            final ExecutorService executorService = Executors.newCachedThreadPool();
-            final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();
-            for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
-                final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
-                for (int i = 0; i < partitionCount; ++i) {
-                    final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser);
-                    final Future<List<StreamingMessage>> future = executorService.submit(producer);
-                    futures.add(future);
+
+            try {
+                final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
+                final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
+                final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig, realizationType, realizationName);
+                final ExecutorService executorService = Executors.newCachedThreadPool();
+                final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();
+                for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
+                    final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+                    for (int i = 0; i < partitionCount; ++i) {
+                        final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser);
+                        final Future<List<StreamingMessage>> future = executorService.submit(producer);
+                        futures.add(future);
+                    }
                 }
-            }
-            List<StreamingMessage> messages = Lists.newLinkedList();
-            for (Future<List<StreamingMessage>> future : futures) {
-                try {
-                    messages.addAll(future.get());
-                } catch (InterruptedException e) {
-                    logger.warn("this thread should not be interrupted, just ignore", e);
-                    continue;
-                } catch (ExecutionException e) {
-                    throw new RuntimeException("error when get StreamingMessages",e.getCause());
+                List<StreamingMessage> messages = Lists.newLinkedList();
+                for (Future<List<StreamingMessage>> future : futures) {
+                    try {
+                        messages.addAll(future.get());
+                    } catch (InterruptedException e) {
+                        logger.warn("this thread should not be interrupted, just ignore", e);
+                        continue;
+                    } catch (ExecutionException e) {
+                        throw new RuntimeException("error when get StreamingMessages", e.getCause());
+                    }
                 }
+                final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime);
+                logger.info("finish to get streaming batch, total message count:" + messages.size());
+                return new StreamingBatch(messages, timeRange);
+            } catch (ReflectiveOperationException e) {
+                throw new RuntimeException("failed to create instance of StreamingParser", e);
             }
-            final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime);
-            logger.info("finish to get streaming batch, total message count:" + messages.size());
-            return new StreamingBatch(messages, timeRange);
-        } catch (ReflectiveOperationException e) {
-            throw new RuntimeException("failed to create instance of StreamingParser", e);
+        } else {
+            throw new IllegalArgumentException("kafka is the only supported streaming type.");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 3455f1d..7b326e2 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -50,6 +50,7 @@ import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.IntermediateColumnDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 
 import com.google.common.base.Function;
@@ -68,9 +69,8 @@ public abstract class StreamingParser {
 
     abstract public boolean filter(StreamingMessage streamingMessage);
 
-    public static StreamingParser getStreamingParser(KafkaConfig kafkaConfig) throws ReflectiveOperationException {
-        final String cubeName = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(kafkaConfig.getName()).getCubeName();
-        final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+    public static StreamingParser getStreamingParser(KafkaConfig kafkaConfig, RealizationType realizationType, String realizationName) throws ReflectiveOperationException {
+        final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(realizationName);
         List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
             @Nullable
             @Override