You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by zh...@apache.org on 2015/10/22 12:06:53 UTC

[1/5] incubator-kylin git commit: KYLIN-1041, Streaming UI

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging a8ead00fa -> aeac6337b


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/bower.json
----------------------------------------------------------------------
diff --git a/webapp/bower.json b/webapp/bower.json
index e0bccb8..ab3ad28 100755
--- a/webapp/bower.json
+++ b/webapp/bower.json
@@ -29,7 +29,8 @@
     "angular-cookies": "1.2",
     "angular-bootstrap-nav-tree": "*",
     "components-font-awesome": "~4.3.0",
-    "bootstrap-sweetalert": "~0.4.3"
+    "bootstrap-sweetalert": "~0.4.3",
+    "angular-toggle-switch":"1.3.0"
   },
   "devDependencies": {
     "less.js": "~1.4.0",

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/grunt.json
----------------------------------------------------------------------
diff --git a/webapp/grunt.json b/webapp/grunt.json
index d89af30..1750014 100755
--- a/webapp/grunt.json
+++ b/webapp/grunt.json
@@ -40,6 +40,7 @@
                 "app/components/angular-underscore/angular-underscore.js",
                 "app/components/jquery-ui/jquery-ui.min.js",
                 "app/components/angular-ui-sortable/sortable.js",
+                "app/components/angular-toggle-switch/angular-toggle-switch.js",
                 "tmp/js/scripts.js"
             ],
             "dest": "tmp/js/scripts.min.js"
@@ -62,6 +63,7 @@
                 "app/css/skins/_all-skins.min.css",
                 "app/components/bootstrap-sweetalert/lib/sweet-alert.css",
                 "app/components/angular-bootstrap-nav-tree/dist/abn_tree.css",
+                "app/components/angular-toggle-switch/angular-toggle-switch.css",
                 "tmp/css/styles.css"
             ],
             "dest": "tmp/css/styles.min.css"


[4/5] incubator-kylin git commit: KYLIN-1041, Streaming UI

Posted by zh...@apache.org.
KYLIN-1041, Streaming UI


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/53b383d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/53b383d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/53b383d9

Branch: refs/heads/2.x-staging
Commit: 53b383d9dc814b9fa53feae2adbbdd99e83be6b1
Parents: a8ead00
Author: jiazhong <ji...@ebay.com>
Authored: Fri Sep 25 17:46:11 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Thu Oct 22 18:05:59 2015 +0800

----------------------------------------------------------------------
 .../kylin/common/restclient/Broadcaster.java    |    2 +-
 .../org/apache/kylin/cube/CubeDescManager.java  |    2 +-
 .../kylin/engine/streaming/StreamingConfig.java |    9 +
 .../engine/streaming/StreamingManager.java      |  163 ++-
 .../kylin/rest/controller/CubeController.java   |  219 +++-
 .../kylin/rest/controller/ModelController.java  |    2 +-
 .../rest/controller/StreamingController.java    |  225 ++++
 .../apache/kylin/rest/request/CubeRequest.java  |   26 +
 .../kylin/rest/request/StreamingRequest.java    |   41 +
 .../apache/kylin/rest/service/BasicService.java |   10 +
 .../apache/kylin/rest/service/CacheService.java |  152 ++-
 .../kylin/rest/service/KafkaConfigService.java  |   94 ++
 .../kylin/rest/service/StreamingService.java    |   90 ++
 server/src/main/resources/kylinSecurity.xml     |    2 +
 .../kylin/source/kafka/KafkaConfigManager.java  |  140 +-
 .../kylin/source/kafka/config/KafkaConfig.java  |    9 +
 webapp/app/index.html                           |   32 +-
 webapp/app/js/app.js                            |    2 +-
 webapp/app/js/controllers/cubeAdvanceSetting.js |   28 +
 webapp/app/js/controllers/cubeEdit.js           | 1216 ++++++++++--------
 webapp/app/js/controllers/cubeFilter.js         |   23 -
 webapp/app/js/controllers/cubeMeasures.js       |   96 ++
 webapp/app/js/controllers/cubeSchema.js         |  130 +-
 webapp/app/js/controllers/cubes.js              |   21 +-
 webapp/app/js/controllers/modelSchema.js        |  429 +++---
 webapp/app/js/controllers/sourceMeta.js         |   10 +-
 webapp/app/js/controllers/streamingConfig.js    |   75 ++
 .../app/js/controllers/streamingKafkaConfig.js  |   23 +
 webapp/app/js/model/cubeConfig.js               |    3 +-
 webapp/app/js/model/cubeListModel.js            |    2 +-
 webapp/app/js/model/streamingListModel.js       |   80 ++
 webapp/app/js/model/streamingModel.js           |   66 +
 webapp/app/js/services/streaming.js             |   28 +
 webapp/app/partials/cubeDesigner/filter.html    |   66 -
 webapp/app/partials/cubeDesigner/info.html      |    4 +-
 .../cubeDesigner/kafkaAdvancedConfig.html       |  165 +++
 .../partials/cubeDesigner/kafkaBasicConfig.html |  114 ++
 webapp/app/partials/cubeDesigner/measures.html  |  360 +++---
 .../partials/cubeDesigner/streamingConfig.html  |  282 ++++
 webapp/app/partials/cubes/cube_detail.html      |   10 +-
 webapp/app/partials/cubes/cubes.html            |    7 +
 .../modelDesigner/conditions_settings.html      |    6 +-
 .../app/partials/modelDesigner/data_model.html  |    2 +-
 .../app/partials/modelDesigner/model_info.html  |    2 +-
 webapp/app/partials/models/models_tree.html     |    4 -
 .../app/partials/streaming/streaming_edit.html  |   34 +
 .../partials/streaming/streaming_schema.html    |   63 +
 .../app/partials/tables/source_table_tree.html  |    2 +
 webapp/app/partials/tables/table_detail.html    |  113 --
 webapp/app/partials/tables/table_load.html      |  130 ++
 webapp/bower.json                               |    3 +-
 webapp/grunt.json                               |    2 +
 52 files changed, 3473 insertions(+), 1346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
index 0b6152f..80ec33c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
@@ -138,7 +138,7 @@ public class Broadcaster {
     }
 
     public enum TYPE {
-        ALL("all"), CUBE("cube"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
+        ALL("all"), CUBE("cube"),STREAMING("streaming"),KAFKA("kafka"),CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
         private String text;
 
         TYPE(String text) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index b5e3eb2..8e75d29 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -228,7 +228,7 @@ public class CubeDescManager {
      * @return
      * @throws IOException
      */
-    public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException {
+        public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException {
         // Validate CubeDesc
         if (desc.getUuid() == null || desc.getName() == null) {
             throw new IllegalArgumentException();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
index 3b3b4f7..a7c6da0 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
@@ -41,6 +41,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.persistence.Serializer;
 
@@ -98,6 +99,14 @@ public class StreamingConfig extends RootPersistentEntity {
         this.margin = margin;
     }
 
+    public String getResourcePath() {
+        return concatResourcePath(name);
+    }
+
+    public static String concatResourcePath(String streamingName) {
+        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + streamingName + ".json";
+    }
+
     @Override
     public StreamingConfig clone() {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index 0e94b04..112379e 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -40,15 +40,17 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,10 +70,21 @@ public class StreamingManager {
     // static cached instances
     private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
 
+    public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+
     private KylinConfig config;
 
-    private StreamingManager(KylinConfig config) {
+    // name ==> StreamingConfig
+    private CaseInsensitiveStringCache<StreamingConfig> streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(Broadcaster.TYPE.STREAMING);
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    private StreamingManager(KylinConfig config) throws IOException {
         this.config = config;
+        reloadAllStreaming();
     }
 
     private ResourceStore getStore() {
@@ -89,12 +102,16 @@ public class StreamingManager {
             if (r != null) {
                 return r;
             }
+            try{
             r = new StreamingManager(config);
             CACHE.put(config, r);
             if (CACHE.size() > 1) {
-                logger.warn("More than one cubemanager singleton exist");
+                logger.warn("More than one streamingManager singleton exist");
             }
             return r;
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
+        }
         }
     }
 
@@ -114,32 +131,96 @@ public class StreamingManager {
         return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
     }
 
-    public boolean createOrUpdateKafkaConfig(String name, StreamingConfig config) {
-        try {
-            getStore().putResource(formatStreamingConfigPath(name), config, StreamingConfig.SERIALIZER);
-            return true;
-        } catch (IOException e) {
-            logger.error("error save resource name:" + name, e);
-            throw new RuntimeException("error save resource name:" + name, e);
-        }
-    }
 
     public StreamingConfig getStreamingConfig(String name) {
-        try {
-            return getStore().getResource(formatStreamingConfigPath(name), StreamingConfig.class, StreamingConfig.SERIALIZER);
-        } catch (IOException e) {
-            logger.error("error get resource name:" + name, e);
-            throw new RuntimeException("error get resource name:" + name, e);
+        return streamingMap.get(name);
+    }
+
+    public List<StreamingConfig> listAllStreaming() {
+        return new ArrayList<>(streamingMap.values());
+    }
+
+
+    /**
+     * Reload StreamingConfig from resource store It will be triggered by an desc
+     * update event.
+     *
+     * @param name
+     * @throws IOException
+     */
+    public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException {
+
+        // Save Source
+        String path = StreamingConfig.concatResourcePath(name);
+
+        // Reload the StreamingConfig
+        StreamingConfig ndesc = loadStreamingConfigAt(path);
+
+        // Here replace the old one
+        streamingMap.putLocal(ndesc.getName(), ndesc);
+        return ndesc;
+    }
+
+    // remove streamingConfig
+    public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+        String path = streamingConfig.getResourcePath();
+        getStore().deleteResource(path);
+        streamingMap.remove(streamingConfig.getName());
+    }
+
+
+    public StreamingConfig getConfig(String name) {
+        name = name.toUpperCase();
+        return streamingMap.get(name);
+    }
+
+    public void removeStreamingLocal(String streamingName) {
+        streamingMap.removeLocal(streamingName);
+    }
+
+    /**
+     * Update CubeDesc with the input. Broadcast the event into cluster
+     *
+     * @param desc
+     * @return
+     * @throws IOException
+     */
+    public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException {
+        // Validate CubeDesc
+        if (desc.getUuid() == null || desc.getName() == null) {
+            throw new IllegalArgumentException();
+        }
+        String name = desc.getName();
+        if (!streamingMap.containsKey(name)) {
+            throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
         }
+
+
+        // Save Source
+        String path = desc.getResourcePath();
+        getStore().putResource(path, desc, STREAMING_SERIALIZER);
+
+        // Reload the StreamingConfig
+        StreamingConfig ndesc = loadStreamingConfigAt(path);
+        // Here replace the old one
+        streamingMap.put(ndesc.getName(), desc);
+
+        return ndesc;
     }
 
-    public void saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+
+    public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
         if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
             throw new IllegalArgumentException();
         }
 
+        if (streamingMap.containsKey(streamingConfig.getName()))
+            throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
+
         String path = formatStreamingConfigPath(streamingConfig.getName());
         getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
+        streamingMap.put(streamingConfig.getName(),streamingConfig);
+        return streamingConfig;
     }
 
     public long getOffset(String streaming, int shard) {
@@ -199,6 +280,46 @@ public class StreamingManager {
         }
     }
 
+    private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
+        ResourceStore store = getStore();
+        StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
+
+        if (StringUtils.isBlank(streamingDesc.getName())) {
+            throw new IllegalStateException("StreamingConfig name must not be blank");
+        }
+        return streamingDesc;
+    }
+
+    private void reloadAllStreaming() throws IOException {
+        ResourceStore store = getStore();
+        logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
+
+        streamingMap.clear();
+
+        List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
+        for (String path : paths) {
+            StreamingConfig streamingConfig;
+            try {
+                streamingConfig = loadStreamingConfigAt(path);
+            } catch (Exception e) {
+                logger.error("Error loading streaming desc " + path, e);
+                continue;
+            }
+            if (path.equals(streamingConfig.getResourcePath()) == false) {
+                logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath());
+                continue;
+            }
+            if (streamingMap.containsKey(streamingConfig.getName())) {
+                logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path);
+                continue;
+            }
+
+            streamingMap.putLocal(streamingConfig.getName(), streamingConfig);
+        }
+
+        logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
+    }    
+    
     private final ObjectMapper mapper = new ObjectMapper();
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index d61793d..9ce6452 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -34,6 +34,7 @@ import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.exception.JobException;
@@ -46,10 +47,14 @@ import org.apache.kylin.rest.exception.NotFoundException;
 import org.apache.kylin.rest.request.CubeRequest;
 import org.apache.kylin.rest.request.CubeSegmentRequest;
 import org.apache.kylin.rest.request.JobBuildRequest;
+import org.apache.kylin.rest.request.StreamingRequest;
 import org.apache.kylin.rest.response.GeneralResponse;
 import org.apache.kylin.rest.response.HBaseResponse;
 import org.apache.kylin.rest.service.CubeService;
 import org.apache.kylin.rest.service.JobService;
+import org.apache.kylin.rest.service.KafkaConfigService;
+import org.apache.kylin.rest.service.StreamingService;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,8 +75,6 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 
 /**
  * CubeController is defined as Restful API entrance for UI.
- *
- * @author jianliu
  */
 @Controller
 @RequestMapping(value = "/cubes")
@@ -79,12 +82,18 @@ public class CubeController extends BasicController {
     private static final Logger logger = LoggerFactory.getLogger(CubeController.class);
 
     @Autowired
+    private StreamingService streamingService;
+
+    @Autowired
+    private KafkaConfigService kafkaConfigService;
+
+    @Autowired
     private CubeService cubeService;
 
     @Autowired
     private JobService jobService;
 
-    @RequestMapping(value = "", method = { RequestMethod.GET })
+    @RequestMapping(value = "", method = {RequestMethod.GET})
     @ResponseBody
     public List<CubeInstance> getCubes(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "modelName", required = false) String modelName, @RequestParam(value = "projectName", required = false) String projectName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
         return cubeService.getCubes(cubeName, projectName, modelName, limit, offset);
@@ -98,7 +107,7 @@ public class CubeController extends BasicController {
      * @throws UnknownHostException
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/sql", method = { RequestMethod.GET })
+    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/sql", method = {RequestMethod.GET})
     @ResponseBody
     public GeneralResponse getSql(@PathVariable String cubeName, @PathVariable String segmentName) {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@ -120,7 +129,7 @@ public class CubeController extends BasicController {
      * @param notifyList
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/notify_list", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/notify_list", method = {RequestMethod.PUT})
     @ResponseBody
     public void updateNotifyList(@PathVariable String cubeName, @RequestBody List<String> notifyList) {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@ -138,7 +147,7 @@ public class CubeController extends BasicController {
 
     }
 
-    @RequestMapping(value = "/{cubeName}/cost", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/cost", method = {RequestMethod.PUT})
     @ResponseBody
     public CubeInstance updateCubeCost(@PathVariable String cubeName, @RequestParam(value = "cost") int cost) {
         try {
@@ -150,7 +159,7 @@ public class CubeController extends BasicController {
         }
     }
 
-    @RequestMapping(value = "/{cubeName}/coprocessor", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/coprocessor", method = {RequestMethod.PUT})
     @ResponseBody
     public Map<String, Boolean> updateCubeCoprocessor(@PathVariable String cubeName, @RequestParam(value = "force") String force) {
         try {
@@ -168,7 +177,7 @@ public class CubeController extends BasicController {
      *
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = {RequestMethod.PUT})
     @ResponseBody
     public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName, @RequestParam(value = "lookupTable") String lookupTable) {
         try {
@@ -186,7 +195,7 @@ public class CubeController extends BasicController {
      * @return
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/rebuild", method = {RequestMethod.PUT})
     @ResponseBody
     public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest jobBuildRequest) {
         try {
@@ -203,7 +212,7 @@ public class CubeController extends BasicController {
         }
     }
 
-    @RequestMapping(value = "/{cubeName}/disable", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/disable", method = {RequestMethod.PUT})
     @ResponseBody
     public CubeInstance disableCube(@PathVariable String cubeName) {
         try {
@@ -221,7 +230,7 @@ public class CubeController extends BasicController {
         }
     }
 
-    @RequestMapping(value = "/{cubeName}/purge", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/purge", method = {RequestMethod.PUT})
     @ResponseBody
     public CubeInstance purgeCube(@PathVariable String cubeName) {
         try {
@@ -239,7 +248,7 @@ public class CubeController extends BasicController {
         }
     }
 
-    @RequestMapping(value = "/{cubeName}/enable", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/enable", method = {RequestMethod.PUT})
     @ResponseBody
     public CubeInstance enableCube(@PathVariable String cubeName) {
         try {
@@ -256,7 +265,7 @@ public class CubeController extends BasicController {
         }
     }
 
-    @RequestMapping(value = "/{cubeName}", method = { RequestMethod.DELETE })
+    @RequestMapping(value = "/{cubeName}", method = {RequestMethod.DELETE})
     @ResponseBody
     public void deleteCube(@PathVariable String cubeName) {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@ -264,44 +273,126 @@ public class CubeController extends BasicController {
             throw new NotFoundException("Cube with name " + cubeName + " not found..");
         }
 
+        //drop related StreamingConfig KafkaConfig if exist
+        try {
+            List<StreamingConfig> configs= streamingService.listAllStreamingConfigs(cubeName);
+            for(StreamingConfig config:configs){
+                try {
+                    streamingService.dropStreamingConfig(config);
+                } catch (IOException e) {
+                    logger.error(e.getLocalizedMessage(), e);
+                    throw new InternalErrorException("Failed to delete StreamingConfig. " + " Caused by: " + e.getMessage(), e);
+                }
+                try {
+                    KafkaConfig kfkConfig = kafkaConfigService.getKafkaConfig(config.getName());
+                    kafkaConfigService.dropKafkaConfig(kfkConfig);
+                } catch (IOException e) {
+                    throw new InternalErrorException("Failed to delete KafkaConfig. " + " Caused by: " + e.getMessage(), e);
+                }
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        //drop Cube
         try {
             cubeService.deleteCube(cube);
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException("Failed to delete cube. " + " Caused by: " + e.getMessage(), e);
         }
+
     }
 
     /**
-     *save cubeDesc
+     * save cubeDesc
      *
      * @return Table metadata array
      * @throws IOException
      */
-    @RequestMapping(value = "", method = { RequestMethod.POST })
+    @RequestMapping(value = "", method = {RequestMethod.POST})
     @ResponseBody
     public CubeRequest saveCubeDesc(@RequestBody CubeRequest cubeRequest) {
 
         CubeDesc desc = deserializeCubeDesc(cubeRequest);
         if (desc == null) {
+            cubeRequest.setMessage("CubeDesc is null.");
             return cubeRequest;
         }
-
         String name = CubeService.getCubeNameFromDesc(desc.getName());
         if (StringUtils.isEmpty(name)) {
             logger.info("Cube name should not be empty.");
             throw new BadRequestException("Cube name should not be empty.");
         }
 
+        CubeInstance cubeInstance;
+
         try {
             desc.setUuid(UUID.randomUUID().toString());
             String projectName = (null == cubeRequest.getProject()) ? ProjectInstance.DEFAULT_PROJECT_NAME : cubeRequest.getProject();
-            cubeService.createCubeAndDesc(name, projectName, desc);
+            cubeInstance = cubeService.createCubeAndDesc(name, projectName, desc);
         } catch (Exception e) {
             logger.error("Failed to deal with the request.", e);
             throw new InternalErrorException(e.getLocalizedMessage(), e);
         }
 
+        //streaming Cube
+        if (cubeRequest.getStreamingCube().equals("true")) {
+            StreamingConfig streamingConfig = deserializeStreamingDesc(cubeRequest);
+            KafkaConfig kafkaConfig = deserializeKafkaDesc(cubeRequest);
+            if (streamingConfig == null) {
+                cubeRequest.setMessage("No StreamingConfig info defined.");
+                return cubeRequest;
+            }
+            if (kafkaConfig == null) {
+                cubeRequest.setMessage("No KafkaConfig info defined.");
+                return cubeRequest;
+            }
+            if (StringUtils.isEmpty(streamingConfig.getName())) {
+                logger.info("StreamingConfig Name should not be empty.");
+                throw new BadRequestException("StremingConfig name should not be empty.");
+            }
+
+            if (StringUtils.isEmpty(kafkaConfig.getName())) {
+                logger.info("KafkaConfig Name should not be empty.");
+                throw new BadRequestException("KafkaConfig name should not be empty.");
+            }
+
+            try {
+                streamingConfig.setUuid(UUID.randomUUID().toString());
+                streamingService.createStreamingConfig(streamingConfig);
+
+            } catch (IOException e) {
+                try {
+                    cubeService.deleteCube(cubeInstance);
+                } catch (Exception ex) {
+                    throw new InternalErrorException("Failed to rollback on delete cube. " + " Caused by: " + e.getMessage(), ex);
+                }
+                logger.error("Failed to save StreamingConfig:" + e.getLocalizedMessage(), e);
+                throw new InternalErrorException("Failed to save StreamingConfig: " + e.getLocalizedMessage());
+            }
+            try {
+                kafkaConfig.setUuid(UUID.randomUUID().toString());
+                kafkaConfigService.createKafkaConfig(kafkaConfig);
+            } catch (IOException e) {
+                try {
+                    streamingService.dropStreamingConfig(streamingConfig);
+                } catch (IOException e1) {
+                    throw new InternalErrorException("StreamingConfig is created, but failed to create KafkaConfig: " + e.getLocalizedMessage());
+                }
+
+                try {
+                    cubeService.deleteCube(cubeInstance);
+                } catch (Exception ex) {
+                    throw new InternalErrorException("Failed to rollback on delete cube. " + " Caused by: " + e.getMessage(), ex);
+                }
+                logger.error("Failed to save KafkaConfig:" + e.getLocalizedMessage(), e);
+                throw new InternalErrorException("Failed to save KafkaConfig: " + e.getLocalizedMessage());
+            }
+
+        }
+
+
         cubeRequest.setUuid(desc.getUuid());
         cubeRequest.setSuccessful(true);
         return cubeRequest;
@@ -314,7 +405,7 @@ public class CubeController extends BasicController {
      * @throws JsonProcessingException
      * @throws IOException
      */
-    @RequestMapping(value = "", method = { RequestMethod.PUT })
+    @RequestMapping(value = "", method = {RequestMethod.PUT})
     @ResponseBody
     public CubeRequest updateCubeDesc(@RequestBody CubeRequest cubeRequest) throws JsonProcessingException {
 
@@ -356,6 +447,46 @@ public class CubeController extends BasicController {
             logger.warn("Cube " + desc.getName() + " fail to update because " + desc.getError());
             updateRequest(cubeRequest, false, omitMessage(desc.getError()));
         }
+
+        //streaming Cube
+        if (cubeRequest.getStreamingCube().equals("true")) {
+            StreamingConfig streamingConfig = deserializeStreamingDesc(cubeRequest);
+            KafkaConfig kafkaConfig = deserializeKafkaDesc(cubeRequest);
+            if (streamingConfig == null) {
+                cubeRequest.setMessage("No StreamingConfig info to update.");
+                return cubeRequest;
+            }
+            if (kafkaConfig == null) {
+                cubeRequest.setMessage("No KafkaConfig info to update.");
+                return cubeRequest;
+            }
+            if (StringUtils.isEmpty(streamingConfig.getName())) {
+                logger.info("StreamingConfig Name should not be empty.");
+                throw new BadRequestException("StremingConfig name should not be empty.");
+            }
+
+            if (StringUtils.isEmpty(kafkaConfig.getName())) {
+                logger.info("KafkaConfig Name should not be empty.");
+                throw new BadRequestException("KafkaConfig name should not be empty.");
+            }
+
+            try {
+                streamingService.updateStreamingConfig(streamingConfig);
+
+            } catch (IOException e) {
+                logger.error("Failed to update StreamingConfig:" + e.getLocalizedMessage(), e);
+                throw new InternalErrorException("Failed to update StreamingConfig: " + e.getLocalizedMessage());
+            }
+            try {
+                kafkaConfigService.updateKafkaConfig(kafkaConfig);
+            } catch (IOException e) {
+                logger.error("Failed to update KafkaConfig:" + e.getLocalizedMessage(), e);
+                throw new InternalErrorException("Failed to update KafkaConfig: " + e.getLocalizedMessage());
+            }
+
+        }
+
+
         String descData = JsonUtil.writeValueAsIndentString(desc);
         cubeRequest.setCubeDescData(descData);
         cubeRequest.setSuccessful(true);
@@ -363,12 +494,12 @@ public class CubeController extends BasicController {
     }
 
     /**
-     *get Hbase Info
+     * get Hbase Info
      *
      * @return true
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/hbase", method = { RequestMethod.GET })
+    @RequestMapping(value = "/{cubeName}/hbase", method = {RequestMethod.GET})
     @ResponseBody
     public List<HBaseResponse> getHBaseInfo(@PathVariable String cubeName) {
         List<HBaseResponse> hbase = new ArrayList<HBaseResponse>();
@@ -405,7 +536,7 @@ public class CubeController extends BasicController {
         return hbase;
     }
 
-    @RequestMapping(value = "/{cubeName}/segments", method = { RequestMethod.POST })
+    @RequestMapping(value = "/{cubeName}/segments", method = {RequestMethod.POST})
     @ResponseBody
     public CubeSegmentRequest appendSegment(@PathVariable String cubeName, @RequestBody CubeSegmentRequest cubeSegmentRequest) {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@ -470,12 +601,49 @@ public class CubeController extends BasicController {
         return desc;
     }
 
+    private StreamingConfig deserializeStreamingDesc(CubeRequest cubeRequest) {
+        StreamingConfig desc = null;
+        try {
+            logger.debug("Saving StreamingConfig " + cubeRequest.getStreamingData());
+            desc = JsonUtil.readValue(cubeRequest.getStreamingData(), StreamingConfig.class);
+        } catch (JsonParseException e) {
+            logger.error("The StreamingConfig definition is not valid.", e);
+            updateRequest(cubeRequest, false, e.getMessage());
+        } catch (JsonMappingException e) {
+            logger.error("The data StreamingConfig definition is not valid.", e);
+            updateRequest(cubeRequest, false, e.getMessage());
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request.", e);
+            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+        }
+        return desc;
+    }
+
+
+    private KafkaConfig deserializeKafkaDesc(CubeRequest cubeRequest) {
+        KafkaConfig desc = null;
+        try {
+            logger.debug("Saving KafkaConfig " + cubeRequest.getKafkaData());
+            desc = JsonUtil.readValue(cubeRequest.getKafkaData(), KafkaConfig.class);
+        } catch (JsonParseException e) {
+            logger.error("The KafkaConfig definition is not valid.", e);
+            updateRequest(cubeRequest, false, e.getMessage());
+        } catch (JsonMappingException e) {
+            logger.error("The data KafkaConfig definition is not valid.", e);
+            updateRequest(cubeRequest, false, e.getMessage());
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request.", e);
+            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+        }
+        return desc;
+    }
+
     /**
      * @return
      */
     private String omitMessage(List<String> errors) {
         StringBuffer buffer = new StringBuffer();
-        for (Iterator<String> iterator = errors.iterator(); iterator.hasNext();) {
+        for (Iterator<String> iterator = errors.iterator(); iterator.hasNext(); ) {
             String string = (String) iterator.next();
             buffer.append(string);
             buffer.append("\n");
@@ -497,4 +665,11 @@ public class CubeController extends BasicController {
         this.jobService = jobService;
     }
 
+    public void setStreamingService(StreamingService streamingService) {
+        this.streamingService = streamingService;
+    }
+
+    public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {
+        this.kafkaConfigService = kafkaConfigService;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java b/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java
index b8af9d6..a409938 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java
@@ -117,7 +117,7 @@ public class ModelController extends BasicController {
         try {
             modelDesc = modelService.updateModelAndDesc(modelDesc);
         } catch (AccessDeniedException accessDeniedException) {
-            throw new ForbiddenException("You don't have right to update this cube.");
+            throw new ForbiddenException("You don't have right to update this model.");
         } catch (Exception e) {
             logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
             throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
new file mode 100644
index 0000000..78b63c0
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.controller;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.rest.exception.BadRequestException;
+import org.apache.kylin.rest.exception.ForbiddenException;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.exception.NotFoundException;
+import org.apache.kylin.rest.request.StreamingRequest;
+import org.apache.kylin.rest.service.KafkaConfigService;
+import org.apache.kylin.rest.service.StreamingService;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.AccessDeniedException;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.*;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * StreamingController is defined as Restful API entrance for UI.
+ *
+ * @author jiazhong
+ */
+@Controller
+@RequestMapping(value = "/streaming")
+public class StreamingController extends BasicController {
+    private static final Logger logger = LoggerFactory.getLogger(StreamingController.class);
+
+    @Autowired
+    private StreamingService streamingService;
+    @Autowired
+    private KafkaConfigService kafkaConfigService;
+
+    @RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
+    @ResponseBody
+    public List<StreamingConfig> getStreamings(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
+        try {
+            return streamingService.getStreamingConfigs(cubeName, limit, offset);
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+        }
+    }
+
+    @RequestMapping(value = "/getKfkConfig", method = { RequestMethod.GET })
+    @ResponseBody
+    public List<KafkaConfig> getKafkaConfigs(@RequestParam(value = "kafkaConfigName", required = false) String kafkaConfigName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
+        try {
+            return kafkaConfigService.getKafkaConfigs(kafkaConfigName, limit, offset);
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+        }
+    }
+
+
+    /**
+     *
+     * create Streaming Schema
+     * @throws java.io.IOException
+     */
+    @RequestMapping(value = "", method = { RequestMethod.POST })
+    @ResponseBody
+    public StreamingRequest saveStreamingConfig(@RequestBody StreamingRequest streamingRequest) {
+        //Update Model
+        StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
+        KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
+        if (streamingConfig == null ||kafkaConfig == null) {
+            return streamingRequest;
+        }
+        if (StringUtils.isEmpty(streamingConfig.getName())) {
+            logger.info("StreamingConfig should not be empty.");
+            throw new BadRequestException("StremingConfig name should not be empty.");
+        }
+
+        try {
+            streamingConfig.setUuid(UUID.randomUUID().toString());
+            streamingService.createStreamingConfig(streamingConfig);
+
+        } catch (IOException e) {
+            logger.error("Failed to save StreamingConfig:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to save StreamingConfig: " + e.getLocalizedMessage());
+        }
+        try {
+            kafkaConfig.setUuid(UUID.randomUUID().toString());
+            kafkaConfigService.createKafkaConfig(kafkaConfig);
+        }catch (IOException e){
+            try {
+                streamingService.dropStreamingConfig(streamingConfig);
+            } catch (IOException e1) {
+                throw new InternalErrorException("StreamingConfig is created, but failed to create KafkaConfig: " + e.getLocalizedMessage());
+            }
+            logger.error("Failed to save KafkaConfig:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to save KafkaConfig: " + e.getLocalizedMessage());
+        }
+        streamingRequest.setSuccessful(true);
+        return streamingRequest;
+    }
+
+    @RequestMapping(value = "", method = { RequestMethod.PUT })
+    @ResponseBody
+        public StreamingRequest updateModelDesc(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
+        StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
+        KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
+
+        if (streamingConfig == null) {
+            return streamingRequest;
+        }
+        try {
+            streamingConfig = streamingService.updateStreamingConfig(streamingConfig);
+        } catch (AccessDeniedException accessDeniedException) {
+            throw new ForbiddenException("You don't have right to update this StreamingConfig.");
+        } catch (Exception e) {
+            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+        }
+        try {
+            kafkaConfig = kafkaConfigService.updateKafkaConfig(kafkaConfig);
+        }catch (AccessDeniedException accessDeniedException) {
+            throw new ForbiddenException("You don't have right to update this KafkaConfig.");
+        } catch (Exception e) {
+            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+        }
+
+        streamingRequest.setSuccessful(true);
+
+        return streamingRequest;
+    }
+
+    @RequestMapping(value = "/{configName}", method = { RequestMethod.DELETE })
+    @ResponseBody
+    public void deleteConfig(@PathVariable String configName) throws IOException {
+        StreamingConfig config = streamingService.getSreamingManager().getStreamingConfig(configName);
+        KafkaConfig kafkaConfig = kafkaConfigService.getKafkaConfig(configName);
+        if (null == config) {
+            throw new NotFoundException("StreamingConfig with name " + configName + " not found..");
+        }
+        try {
+            streamingService.dropStreamingConfig(config);
+            kafkaConfigService.dropKafkaConfig(kafkaConfig);
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to delete StreamingConfig. " + " Caused by: " + e.getMessage(), e);
+        }
+    }
+
+    private StreamingConfig deserializeSchemalDesc(StreamingRequest streamingRequest) {
+        StreamingConfig desc = null;
+        try {
+            logger.debug("Saving StreamingConfig " + streamingRequest.getStreamingConfig());
+            desc = JsonUtil.readValue(streamingRequest.getStreamingConfig(), StreamingConfig.class);
+        } catch (JsonParseException e) {
+            logger.error("The StreamingConfig definition is not valid.", e);
+            updateRequest(streamingRequest, false, e.getMessage());
+        } catch (JsonMappingException e) {
+            logger.error("The data StreamingConfig definition is not valid.", e);
+            updateRequest(streamingRequest, false, e.getMessage());
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request.", e);
+            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+        }
+        return desc;
+    }
+
+
+    private KafkaConfig deserializeKafkaSchemalDesc(StreamingRequest streamingRequest) {
+        KafkaConfig desc = null;
+        try {
+            logger.debug("Saving KafkaConfig " + streamingRequest.getKafkaConfig());
+            desc = JsonUtil.readValue(streamingRequest.getKafkaConfig(), KafkaConfig.class);
+        } catch (JsonParseException e) {
+            logger.error("The KafkaConfig definition is not valid.", e);
+            updateRequest(streamingRequest, false, e.getMessage());
+        } catch (JsonMappingException e) {
+            logger.error("The data KafkaConfig definition is not valid.", e);
+            updateRequest(streamingRequest, false, e.getMessage());
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request.", e);
+            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+        }
+        return desc;
+    }
+
+    private void updateRequest(StreamingRequest request, boolean success, String message) {
+        request.setSuccessful(success);
+        request.setMessage(message);
+    }
+
+    public void setStreamingService(StreamingService streamingService) {
+        this.streamingService= streamingService;
+    }
+
+    public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {
+        this.kafkaConfigService = kafkaConfigService;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java b/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java
index 1371a4f..fb9952b 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java
@@ -23,9 +23,12 @@ public class CubeRequest {
     private String uuid;
     private String cubeName;
     private String cubeDescData;
+    private String streamingData;
+    private String kafkaData;
     private boolean successful;
     private String message;
     private String project;
+    private String streamingCube;
 
     public String getUuid() {
         return uuid;
@@ -104,4 +107,27 @@ public class CubeRequest {
         this.project = project;
     }
 
+    public String getStreamingCube() {
+        return streamingCube;
+    }
+
+    public void setStreamingCube(String streamingCube) {
+        this.streamingCube = streamingCube;
+    }
+
+    public String getStreamingData() {
+        return streamingData;
+    }
+
+    public void setStreamingData(String streamingData) {
+        this.streamingData = streamingData;
+    }
+
+    public String getKafkaData() {
+        return kafkaData;
+    }
+
+    public void setKafkaData(String kafkaData) {
+        this.kafkaData = kafkaData;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
index 3bcf9d7..07c30f3 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
@@ -25,6 +25,14 @@ import java.lang.String;public class StreamingRequest {
 
     private String tableData;
 
+    private String streamingConfig;
+
+    private String kafkaConfig;
+
+    private boolean successful;
+
+    private String message;
+
     public String getProject() {
         return project;
     }
@@ -40,4 +48,37 @@ import java.lang.String;public class StreamingRequest {
     public void setTableData(String tableData) {
         this.tableData = tableData;
     }
+
+    public boolean isSuccessful() {
+        return successful;
+    }
+
+    public void setSuccessful(boolean successful) {
+        this.successful = successful;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public String getStreamingConfig() {
+        return streamingConfig;
+    }
+
+    public void setStreamingConfig(String streamingConfig) {
+        this.streamingConfig = streamingConfig;
+    }
+
+    public String getKafkaConfig() {
+        return kafkaConfig;
+    }
+
+    public void setKafkaConfig(String kafkaConfig) {
+        this.kafkaConfig = kafkaConfig;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 70deada..5ac12ea 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -43,6 +43,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.invertedindex.IIDescManager;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -56,6 +57,7 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.enumerator.OLAPQuery;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.storage.hybrid.HybridManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -167,6 +169,14 @@ public abstract class BasicService {
         return CubeManager.getInstance(getConfig());
     }
 
+    public final StreamingManager getSreamingManager() {
+        return StreamingManager.getInstance(getConfig());
+    }
+
+    public final KafkaConfigManager getKafkaManager() throws IOException {
+        return  KafkaConfigManager.getInstance(getConfig());
+    }
+
     public final CubeDescManager getCubeDescManager() {
         return CubeDescManager.getInstance(getConfig());
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 45cff4d..98ddc6f 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -28,6 +28,7 @@ import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.invertedindex.IIDescManager;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.metadata.MetadataManager;
@@ -35,6 +36,7 @@ import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.RealizationRegistry;
 import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.storage.hybrid.HybridManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,54 +77,62 @@ public class CacheService extends BasicService {
         logger.info(log);
         try {
             switch (cacheType) {
-            case CUBE:
-                CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
-                getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
-                getProjectManager().clearL2Cache();
-                //clean query related cache first
-                super.cleanDataCache(newCube.getUuid());
-                cubeService.updateOnNewSegmentReady(cacheKey);
-                break;
-            case CUBE_DESC:
-                getCubeDescManager().reloadCubeDescLocal(cacheKey);
-                break;
-            case PROJECT:
-                ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
-                removeOLAPDataSource(projectInstance.getName());
-                break;
-            case INVERTED_INDEX:
-                //II update does not need to update storage cache because it is dynamic already
-                getIIManager().reloadIILocal(cacheKey);
-                getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey);
-                getProjectManager().clearL2Cache();
-                break;
-            case INVERTED_INDEX_DESC:
-                getIIDescManager().reloadIIDescLocal(cacheKey);
-                break;
-            case TABLE:
-                getMetadataManager().reloadTableCache(cacheKey);
-                IIDescManager.clearCache();
-                CubeDescManager.clearCache();
-                break;
-            case DATA_MODEL:
-                getMetadataManager().reloadDataModelDesc(cacheKey);
-                IIDescManager.clearCache();
-                CubeDescManager.clearCache();
-                break;
-            case ALL:
-                MetadataManager.clearCache();
-                CubeDescManager.clearCache();
-                CubeManager.clearCache();
-                IIDescManager.clearCache();
-                IIManager.clearCache();
-                HybridManager.clearCache();
-                RealizationRegistry.clearCache();
-                ProjectManager.clearCache();
-                super.cleanAllDataCache();
-                BasicService.removeAllOLAPDataSources();
-                break;
-            default:
-                throw new RuntimeException("invalid cacheType:" + cacheType);
+                case CUBE:
+                    CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
+                    getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
+                    getProjectManager().clearL2Cache();
+                    //clean query related cache first
+                    super.cleanDataCache(newCube.getUuid());
+                    cubeService.updateOnNewSegmentReady(cacheKey);
+                    break;
+                case STREAMING:
+                    getSreamingManager().reloadStreamingConfigLocal(cacheKey);
+                    break;
+                case KAFKA:
+                    getKafkaManager().reloadKafkaConfigLocal(cacheKey);
+                    break;
+                case CUBE_DESC:
+                    getCubeDescManager().reloadCubeDescLocal(cacheKey);
+                    break;
+                case PROJECT:
+                    ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
+                    removeOLAPDataSource(projectInstance.getName());
+                    break;
+                case INVERTED_INDEX:
+                    //II update does not need to update storage cache because it is dynamic already
+                    getIIManager().reloadIILocal(cacheKey);
+                    getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey);
+                    getProjectManager().clearL2Cache();
+                    break;
+                case INVERTED_INDEX_DESC:
+                    getIIDescManager().reloadIIDescLocal(cacheKey);
+                    break;
+                case TABLE:
+                    getMetadataManager().reloadTableCache(cacheKey);
+                    IIDescManager.clearCache();
+                    CubeDescManager.clearCache();
+                    break;
+                case DATA_MODEL:
+                    getMetadataManager().reloadDataModelDesc(cacheKey);
+                    IIDescManager.clearCache();
+                    CubeDescManager.clearCache();
+                    break;
+                case ALL:
+                    MetadataManager.clearCache();
+                    CubeDescManager.clearCache();
+                    CubeManager.clearCache();
+                    IIDescManager.clearCache();
+                    IIManager.clearCache();
+                    HybridManager.clearCache();
+                    RealizationRegistry.clearCache();
+                    ProjectManager.clearCache();
+                    KafkaConfigManager.clearCache();
+                    StreamingManager.clearCache();
+                    super.cleanAllDataCache();
+                    BasicService.removeAllOLAPDataSources();
+                    break;
+                default:
+                    throw new RuntimeException("invalid cacheType:" + cacheType);
             }
         } catch (IOException e) {
             throw new RuntimeException("error " + log, e);
@@ -133,29 +143,29 @@ public class CacheService extends BasicService {
         final String log = "remove cache type: " + cacheType + " name:" + cacheKey;
         try {
             switch (cacheType) {
-            case CUBE:
-                String storageUUID = getCubeManager().getCube(cacheKey).getUuid();
-                getCubeManager().removeCubeLocal(cacheKey);
-                super.cleanDataCache(storageUUID);
-                break;
-            case CUBE_DESC:
-                getCubeDescManager().removeLocalCubeDesc(cacheKey);
-                break;
-            case PROJECT:
-                ProjectManager.clearCache();
-                break;
-            case INVERTED_INDEX:
-                getIIManager().removeIILocal(cacheKey);
-                break;
-            case INVERTED_INDEX_DESC:
-                getIIDescManager().removeIIDescLocal(cacheKey);
-                break;
-            case TABLE:
-                throw new UnsupportedOperationException(log);
-            case DATA_MODEL:
-                throw new UnsupportedOperationException(log);
-            default:
-                throw new RuntimeException("invalid cacheType:" + cacheType);
+                case CUBE:
+                    String storageUUID = getCubeManager().getCube(cacheKey).getUuid();
+                    getCubeManager().removeCubeLocal(cacheKey);
+                    super.cleanDataCache(storageUUID);
+                    break;
+                case CUBE_DESC:
+                    getCubeDescManager().removeLocalCubeDesc(cacheKey);
+                    break;
+                case PROJECT:
+                    ProjectManager.clearCache();
+                    break;
+                case INVERTED_INDEX:
+                    getIIManager().removeIILocal(cacheKey);
+                    break;
+                case INVERTED_INDEX_DESC:
+                    getIIDescManager().removeIIDescLocal(cacheKey);
+                    break;
+                case TABLE:
+                    throw new UnsupportedOperationException(log);
+                case DATA_MODEL:
+                    throw new UnsupportedOperationException(log);
+                default:
+                    throw new RuntimeException("invalid cacheType:" + cacheType);
             }
         } catch (IOException e) {
             throw new RuntimeException("error " + log, e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java b/server/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
new file mode 100644
index 0000000..a0b19b2
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.prepost.PostFilter;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component("kafkaMgmtService")
+public class KafkaConfigService extends BasicService {
+
+    @Autowired
+    private AccessService accessService;
+
+    @PostFilter(Constant.ACCESS_POST_FILTER_READ)
+    public List<KafkaConfig> listAllKafkaConfigs(final String kafkaConfigName) throws IOException {
+        List<KafkaConfig> kafkaConfigs = new ArrayList<KafkaConfig>();
+//        CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null;
+        if (null == kafkaConfigName) {
+            kafkaConfigs = getKafkaManager().listAllKafkaConfigs();
+        } else {
+            List<KafkaConfig> configs = getKafkaManager().listAllKafkaConfigs();
+            for(KafkaConfig config : configs){
+                if(kafkaConfigName.equals(config.getName())){
+                    kafkaConfigs.add(config);
+                }
+            }
+        }
+
+        return kafkaConfigs;
+    }
+
+    public List<KafkaConfig> getKafkaConfigs(final String kafkaConfigName, final Integer limit, final Integer offset) throws IOException {
+
+        List<KafkaConfig> kafkaConfigs;
+        kafkaConfigs = listAllKafkaConfigs(kafkaConfigName);
+
+        if (limit == null || offset == null) {
+            return kafkaConfigs;
+        }
+
+        if ((kafkaConfigs.size() - offset) < limit) {
+            return kafkaConfigs.subList(offset, kafkaConfigs.size());
+        }
+
+        return kafkaConfigs.subList(offset, offset + limit);
+    }
+
+
+
+    public KafkaConfig createKafkaConfig(KafkaConfig config) throws IOException {
+        if (getKafkaManager().getKafkaConfig(config.getName()) != null) {
+            throw new InternalErrorException("The kafkaConfig named " + config.getName() + " already exists");
+        }
+        getKafkaManager().createKafkaConfig(config.getName(),config);
+        return config;
+    }
+
+//    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+    public KafkaConfig updateKafkaConfig(KafkaConfig config) throws IOException {
+        return getKafkaManager().updateKafkaConfig(config);
+    }
+
+    public KafkaConfig getKafkaConfig(String configName) throws IOException {
+        return getKafkaManager().getKafkaConfig(configName);
+    }
+//    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+    public void dropKafkaConfig(KafkaConfig config) throws IOException {
+        getKafkaManager().removeKafkaConfig(config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
new file mode 100644
index 0000000..0f67ac8
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.prepost.PostFilter;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component("streamingMgmtService")
+public class StreamingService extends BasicService {
+
+    @Autowired
+    private AccessService accessService;
+
+    @PostFilter(Constant.ACCESS_POST_FILTER_READ)
+    public List<StreamingConfig> listAllStreamingConfigs(final String cubeName) throws IOException {
+        List<StreamingConfig> streamingConfigs = new ArrayList();
+        CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null;
+        if (null == cubeInstance) {
+            streamingConfigs = getSreamingManager().listAllStreaming();
+        } else {
+            for(StreamingConfig config : getSreamingManager().listAllStreaming()){
+                if(cubeInstance.getName().equals(config.getCubeName())){
+                    streamingConfigs.add(config);
+                }
+            }
+        }
+
+        return streamingConfigs;
+    }
+
+    public List<StreamingConfig> getStreamingConfigs(final String cubeName, final Integer limit, final Integer offset) throws IOException {
+
+        List<StreamingConfig> streamingConfigs;
+        streamingConfigs = listAllStreamingConfigs(cubeName);
+
+        if (limit == null || offset == null) {
+            return streamingConfigs;
+        }
+
+        if ((streamingConfigs.size() - offset) < limit) {
+            return streamingConfigs.subList(offset, streamingConfigs.size());
+        }
+
+        return streamingConfigs.subList(offset, offset + limit);
+    }
+
+    public StreamingConfig createStreamingConfig(StreamingConfig config) throws IOException {
+        if (getSreamingManager().getStreamingConfig(config.getName()) != null) {
+            throw new InternalErrorException("The streamingConfig named " + config.getName() + " already exists");
+        }
+        StreamingConfig streamingConfig =  getSreamingManager().saveStreamingConfig(config);
+        return streamingConfig;
+    }
+
+//    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+    public StreamingConfig updateStreamingConfig(StreamingConfig config) throws IOException {
+        return getSreamingManager().updateStreamingConfig(config);
+    }
+
+//    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+    public void dropStreamingConfig(StreamingConfig config) throws IOException {
+        getSreamingManager().removeStreamingConfig(config);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/resources/kylinSecurity.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index 518a9bd..ee3c891 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -19,6 +19,8 @@
         <scr:intercept-url pattern="/api/cache*/**" access="permitAll" />
 		<scr:intercept-url pattern="/api/cubes/src/tables" access="hasAnyRole('ROLE_ANALYST')" />
 		<scr:intercept-url pattern="/api/cubes*/**" access="isAuthenticated()" />
+		<scr:intercept-url pattern="/api/models*/**" access="isAuthenticated()" />
+		<scr:intercept-url pattern="/api/streaming*/**" access="isAuthenticated()" />
 		<scr:intercept-url pattern="/api/job*/**" access="isAuthenticated()" />
 		<scr:intercept-url pattern="/api/admin/config" access="permitAll" />
 		<scr:intercept-url pattern="/api/projects" access="permitAll" />

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index d8f9f50..3032d13 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -40,13 +40,21 @@ import com.fasterxml.jackson.databind.type.MapType;
 import com.fasterxml.jackson.databind.type.SimpleType;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -60,15 +68,25 @@ public class KafkaConfigManager {
 
     private KylinConfig config;
 
-    private KafkaConfigManager(KylinConfig config) {
+    // name ==> StreamingConfig
+    private CaseInsensitiveStringCache<KafkaConfig> kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(Broadcaster.TYPE.KAFKA);
+
+    public static final Serializer<KafkaConfig> KAFKA_SERIALIZER = new JsonSerializer<KafkaConfig>(KafkaConfig.class);
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    private KafkaConfigManager(KylinConfig config) throws IOException {
         this.config = config;
+        reloadAllKafkaConfig();
     }
 
     private ResourceStore getStore() {
         return ResourceStore.getStore(this.config);
     }
 
-    public static KafkaConfigManager getInstance(KylinConfig config) {
+    public static KafkaConfigManager getInstance(KylinConfig config){
         KafkaConfigManager r = CACHE.get(config);
         if (r != null) {
             return r;
@@ -79,15 +97,43 @@ public class KafkaConfigManager {
             if (r != null) {
                 return r;
             }
+            try{
             r = new KafkaConfigManager(config);
             CACHE.put(config, r);
             if (CACHE.size() > 1) {
-                logger.warn("More than one cubemanager singleton exist");
+                logger.warn("More than one KafkaConfigManager singleton exist");
             }
             return r;
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e);
+        }
         }
     }
 
+    public List<KafkaConfig> listAllKafkaConfigs() {
+        return new ArrayList(kafkaMap.values());
+    }
+
+    /**
+     * Reload KafkaConfig from resource store It will be triggered by an desc
+     * update event.
+     *
+     * @param name
+     * @throws IOException
+     */
+    public KafkaConfig reloadKafkaConfigLocal(String name) throws IOException {
+
+        // Save Source
+        String path = KafkaConfig.getKafkaResourcePath(name);
+
+        // Reload the KafkaConfig
+        KafkaConfig ndesc = loadKafkaConfigAt(path);
+
+        // Here replace the old one
+        kafkaMap.putLocal(ndesc.getName(), ndesc);
+        return ndesc;
+    }
+
     private boolean checkExistence(String name) {
         return true;
     }
@@ -96,9 +142,17 @@ public class KafkaConfigManager {
         return ResourceStore.KAfKA_RESOURCE_ROOT + "/" + name + ".json";
     }
 
-    public boolean createOrUpdateKafkaConfig(String name, KafkaConfig config) {
+    public boolean createKafkaConfig(String name, KafkaConfig config) {
+
+        if (config == null || StringUtils.isEmpty(config.getName())) {
+            throw new IllegalArgumentException();
+        }
+
+        if (kafkaMap.containsKey(config.getName()))
+            throw new IllegalArgumentException("KafkaConfig '" + config.getName() + "' already exists");
         try {
             getStore().putResource(formatStreamingConfigPath(name), config, KafkaConfig.SERIALIZER);
+            kafkaMap.put(config.getName(), config);
             return true;
         } catch (IOException e) {
             logger.error("error save resource name:" + name, e);
@@ -106,13 +160,41 @@ public class KafkaConfigManager {
         }
     }
 
-    public KafkaConfig getKafkaConfig(String name) {
-        try {
-            return getStore().getResource(formatStreamingConfigPath(name), KafkaConfig.class, KafkaConfig.SERIALIZER);
-        } catch (IOException e) {
-            logger.error("error get resource name:" + name, e);
-            throw new RuntimeException("error get resource name:" + name, e);
+    public KafkaConfig updateKafkaConfig(KafkaConfig desc) throws IOException {
+        // Validate KafkaConfig
+        if (desc.getUuid() == null || desc.getName() == null) {
+            throw new IllegalArgumentException();
+        }
+        String name = desc.getName();
+        if (!kafkaMap.containsKey(name)) {
+            throw new IllegalArgumentException("KafkaConfig '" + name + "' does not exist.");
+        }
+
+        // Save Source
+        String path = desc.getResourcePath();
+        getStore().putResource(path, desc, KAFKA_SERIALIZER);
+
+        // Reload the KafkaConfig
+        KafkaConfig ndesc = loadKafkaConfigAt(path);
+        // Here replace the old one
+        kafkaMap.put(ndesc.getName(), desc);
+
+        return ndesc;
+    }
+
+    private KafkaConfig loadKafkaConfigAt(String path) throws IOException {
+        ResourceStore store = getStore();
+        KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class,KAFKA_SERIALIZER );
+
+        if (StringUtils.isBlank(kafkaConfig.getName())) {
+            throw new IllegalStateException("KafkaConfig name must not be blank");
         }
+        return kafkaConfig;
+    }
+
+
+    public KafkaConfig getKafkaConfig(String name) {
+        return kafkaMap.get(name);
     }
 
     public void saveKafkaConfig(KafkaConfig kafkaConfig) throws IOException {
@@ -124,6 +206,44 @@ public class KafkaConfigManager {
         getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER);
     }
 
+    // remove kafkaConfig
+    public void removeKafkaConfig(KafkaConfig kafkaConfig) throws IOException {
+        String path = kafkaConfig.getResourcePath();
+        getStore().deleteResource(path);
+        kafkaMap.remove(kafkaConfig.getName());
+    }
+
+
+    private void reloadAllKafkaConfig() throws IOException {
+        ResourceStore store = getStore();
+        logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAfKA_RESOURCE_ROOT));
+
+        kafkaMap.clear();
+
+        List<String> paths = store.collectResourceRecursively(ResourceStore.KAfKA_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
+        for (String path : paths) {
+            KafkaConfig kafkaConfig;
+            try {
+                kafkaConfig = loadKafkaConfigAt(path);
+            } catch (Exception e) {
+                logger.error("Error loading kafkaConfig desc " + path, e);
+                continue;
+            }
+            if (path.equals(kafkaConfig.getResourcePath()) == false) {
+                logger.error("Skip suspicious desc at " + path + ", " + kafkaConfig + " should be at " + kafkaConfig.getResourcePath());
+                continue;
+            }
+            if (kafkaMap.containsKey(kafkaConfig.getName())) {
+                logger.error("Dup KafkaConfig name '" + kafkaConfig.getName() + "' on path " + path);
+                continue;
+            }
+
+            kafkaMap.putLocal(kafkaConfig.getName(), kafkaConfig);
+        }
+
+        logger.debug("Loaded " + kafkaMap.size() + " KafkaConfig(s)");
+    }
+
     private final ObjectMapper mapper = new ObjectMapper();
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 1aff0ce..90df9f5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -42,12 +42,14 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.persistence.Serializer;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonManagedReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.metadata.MetadataConstants;
 
 /**
  */
@@ -84,6 +86,13 @@ public class KafkaConfig extends RootPersistentEntity {
     //"configA=1;configB=2"
     @JsonProperty("parserProperties")
     private String parserProperties;
+    public String getResourcePath() {
+        return getKafkaResourcePath(name);
+    }
+
+    public static String getKafkaResourcePath(String streamingName) {
+        return ResourceStore.KAfKA_RESOURCE_ROOT + "/" + streamingName + MetadataConstants.FILE_SURFIX;
+    }
 
     public List<KafkaClusterConfig> getKafkaClusterConfigs() {
         return kafkaClusterConfigs;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/index.html
----------------------------------------------------------------------
diff --git a/webapp/app/index.html b/webapp/app/index.html
index d2b7e68..caa8d2a 100644
--- a/webapp/app/index.html
+++ b/webapp/app/index.html
@@ -45,6 +45,7 @@
     <link rel="stylesheet" type="text/css" href="css/AdminLTE.css">
     <link rel="stylesheet" type="text/css" href="components/bootstrap-sweetalert/lib/sweet-alert.css">
     <link rel="stylesheet" type="text/css" href="components/angular-bootstrap-nav-tree/dist/abn_tree.css">
+    <link rel="stylesheet" type="text/css" href="components/angular-toggle-switch/angular-toggle-switch.css">
 
     <link rel="stylesheet/less" href="less/build.less">
     <!-- endref -->
@@ -102,6 +103,7 @@
 <script src="components/angular-underscore/angular-underscore.js"></script>
 <script src="components/jquery-ui/jquery-ui.min.js"></script>
 <script src="components/angular-ui-sortable/sortable.js"></script>
+<script src="components/angular-toggle-switch/angular-toggle-switch.js"></script>
 
 <script src="js/app.js"></script>
 <script src="js/config.js"></script>
@@ -120,6 +122,7 @@
 <script src="js/services/model.js"></script>
 
 <script src="js/services/cubes.js"></script>
+<script src="js/services/streaming.js"></script>
 <script src="js/services/graph.js"></script>
 <script src="js/services/jobs.js"></script>
 <script src="js/services/message.js"></script>
@@ -136,6 +139,8 @@
 <script src="js/model/jobConfig.js"></script>
 <script src="js/model/projectConfig.js"></script>
 <script src="js/model/tableConfig.js"></script>
+<script src="js/model/streamingModel.js"></script>
+<script src="js/model/streamingListModel.js"></script>
 <!--New GUI-->
 <script src="js/model/modelConfig.js"></script>
 
@@ -157,6 +162,7 @@
 <script src="js/controllers/job.js"></script>
 <script src="js/controllers/cube.js"></script>
 <script src="js/controllers/cubes.js"></script>
+<script src="js/controllers/streaming.js"></script>
 <script src="js/controllers/projects.js"></script>
 <script src="js/controllers/cubeEdit.js"></script>
 <script src="js/controllers/cubeSchema.js"></script>
@@ -166,15 +172,17 @@
 <script src="js/controllers/projectMeta.js"></script>
 <script src="js/controllers/cubeModel.js"></script>
 <script src="js/controllers/cubeDimensions.js"></script>
-<script src="js/controllers/cubeFilter.js"></script>
 <script src="js/controllers/cubeRefresh.js"></script>
 <script src="js/controllers/cubeAdvanceSetting.js"></script>
+<script src="js/controllers/cubeMeasures.js"></script>
 <!--New GUI-->
 <script src="js/controllers/modelSchema.js"></script>
 <script src="js/controllers/modelDimensions.js"></script>
 <script src="js/controllers/modelRefresh.js"></script>
 <script src="js/controllers/modelEdit.js"></script>
 
+<script src="js/controllers/streamingConfig.js"></script>
+
 <!--New GUI-->
 <script src="js/controllers/models.js"></script>
 
@@ -222,5 +230,27 @@
     </div>
 </script>
 
+<!-- static template for cube save/update result notification -->
+<script type="text/ng-template" id="streamingResultError.html">
+  <div class="callout">
+    <h4>Error Message</h4>
+    <p>{{text}}</p>
+  </div>
+  <div class="callout callout-danger">
+    <h4>Streaming Schema</h4>
+    <pre>{{streamingSchema}}</pre>
+  </div>
+  <div class="callout callout-danger">
+    <h4>Kafka Schema</h4>
+    <pre>{{kfkSchema}}</pre>
+  </div>
+</script>
+
+<script type="text/ng-template" id="streamingResultSuccess.html">
+  <div class="callout callout-info">
+    <p>{{text}}</p>
+  </div>
+
+
 </body>
 </html>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/app.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/app.js b/webapp/app/js/app.js
index 1e00bdc..dc9d087 100644
--- a/webapp/app/js/app.js
+++ b/webapp/app/js/app.js
@@ -17,4 +17,4 @@
 */
 
 //Kylin Application Module
-KylinApp = angular.module('kylin', ['ngRoute', 'ngResource', 'ngGrid', 'ui.bootstrap', 'ui.ace', 'base64', 'angularLocalStorage', 'localytics.directives', 'treeControl', 'nvd3ChartDirectives','ngLoadingRequest','oitozero.ngSweetAlert','ngCookies','angular-underscore', 'ngAnimate', 'ui.sortable','angularBootstrapNavTree']);
+KylinApp = angular.module('kylin', ['ngRoute', 'ngResource', 'ngGrid', 'ui.bootstrap', 'ui.ace', 'base64', 'angularLocalStorage', 'localytics.directives', 'treeControl', 'nvd3ChartDirectives','ngLoadingRequest','oitozero.ngSweetAlert','ngCookies','angular-underscore', 'ngAnimate', 'ui.sortable','angularBootstrapNavTree','toggle-switch']);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/controllers/cubeAdvanceSetting.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeAdvanceSetting.js b/webapp/app/js/controllers/cubeAdvanceSetting.js
index fb9c019..fc02c6e 100644
--- a/webapp/app/js/controllers/cubeAdvanceSetting.js
+++ b/webapp/app/js/controllers/cubeAdvanceSetting.js
@@ -98,4 +98,32 @@ KylinApp.controller('CubeAdvanceSettingCtrl', function ($scope, $modal,cubeConfi
     })
   }
 
+
+  $scope.addNewRowkeyColumn = function () {
+    $scope.cubeMetaFrame.rowkey.rowkey_columns.push({
+      "column": "",
+      "length": 0,
+      "dictionary": "true",
+      "mandatory": false
+    });
+  };
+
+  $scope.addNewAggregationGroup = function () {
+    $scope.cubeMetaFrame.rowkey.aggregation_groups.push([]);
+  };
+
+  $scope.refreshAggregationGroup = function (list, index, aggregation_groups) {
+    if (aggregation_groups) {
+      list[index] = aggregation_groups;
+    }
+  };
+
+  $scope.removeElement = function (arr, element) {
+    var index = arr.indexOf(element);
+    if (index > -1) {
+      arr.splice(index, 1);
+    }
+  };
+
+
 });



[3/5] incubator-kylin git commit: KYLIN-1041, Streaming UI

Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/controllers/cubeEdit.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeEdit.js b/webapp/app/js/controllers/cubeEdit.js
index 2dea4f4..2920458 100755
--- a/webapp/app/js/controllers/cubeEdit.js
+++ b/webapp/app/js/controllers/cubeEdit.js
@@ -14,629 +14,785 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 'use strict';
 
 
-KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $location, $templateCache, $interpolate, MessageService, TableService, CubeDescService, CubeService, loadingRequest, SweetAlert,$log,cubeConfig,CubeDescModel,MetaModel,TableModel,ModelDescService,modelsManager,cubesManager,ProjectModel) {
-    $scope.cubeConfig = cubeConfig;
+KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $location, $templateCache, $interpolate, MessageService, TableService, CubeDescService, CubeService, loadingRequest, SweetAlert, $log, cubeConfig, CubeDescModel, MetaModel, TableModel, ModelDescService, modelsManager, cubesManager, ProjectModel, StreamingModel, StreamingService) {
+  var STREAMING_SUFFIX = "_streaming";
+  $scope.cubeConfig = cubeConfig;
 
-  $scope.metaModel ={};
+  $scope.metaModel = {};
   $scope.modelsManager = modelsManager;
 
-    //add or edit ?
-    var absUrl = $location.absUrl();
-    $scope.cubeMode = absUrl.indexOf("/cubes/add")!=-1?'addNewCube':absUrl.indexOf("/cubes/edit")!=-1?'editExistCube':'default';
+  //add or edit ?
+  var absUrl = $location.absUrl();
+  $scope.cubeMode = absUrl.indexOf("/cubes/add") != -1 ? 'addNewCube' : absUrl.indexOf("/cubes/edit") != -1 ? 'editExistCube' : 'default';
 
-  if($scope.cubeMode=="addNewCube"&&!ProjectModel.getSelectedProject()){
+  if ($scope.cubeMode == "addNewCube" && !ProjectModel.getSelectedProject()) {
     SweetAlert.swal('Oops...', 'Please select your project first.', 'warning');
     $location.path("/models");
     return;
   }
 
 
-
   $scope.getColumnsByTable = function (tableName) {
-        var temp = [];
-        angular.forEach(TableModel.selectProjectTables, function (table) {
-            if (table.name == tableName) {
-                temp = table.columns;
-            }
-        });
-        return temp;
-    };
+    var temp = [];
+    angular.forEach(TableModel.selectProjectTables, function (table) {
+      if (table.name == tableName) {
+        temp = table.columns;
+      }
+    });
+    return temp;
+  };
 
-    $scope.getDimColumnsByTable = function (tableName) {
-        if(!tableName){
-            return [];
-        }
-        var tableColumns = $scope.getColumnsByTable(tableName);
-        var tableDim = _.find($scope.metaModel.model.dimensions,function(dimension){return dimension.table == tableName});
-        var tableDimColumns = tableDim.columns;
-        var avaColObject = _.filter(tableColumns,function(col){
-            return tableDimColumns.indexOf(col.name)!=-1;
-        });
-        return avaColObject;
-    };
+  $scope.getDimColumnsByTable = function (tableName) {
+    if (!tableName) {
+      return [];
+    }
+    var tableColumns = $scope.getColumnsByTable(tableName);
+    var tableDim = _.find($scope.metaModel.model.dimensions, function (dimension) {
+      return dimension.table == tableName
+    });
+    var tableDimColumns = tableDim.columns;
+    var avaColObject = _.filter(tableColumns, function (col) {
+      return tableDimColumns.indexOf(col.name) != -1;
+    });
+    return avaColObject;
+  };
 
-    $scope.getMetricColumnsByTable = function (tableName) {
-        if(!tableName){
-            return [];
-        }
-        var tableColumns = $scope.getColumnsByTable(tableName);
-        var tableMetrics = $scope.metaModel.model.metrics;
-        var avaColObject = _.filter(tableColumns,function(col){
-            return tableMetrics.indexOf(col.name)!=-1;
-        });
-        return avaColObject;
-    };
+  $scope.getMetricColumnsByTable = function (tableName) {
+    if (!tableName) {
+      return [];
+    }
+    var tableColumns = $scope.getColumnsByTable(tableName);
+    var tableMetrics = $scope.metaModel.model.metrics;
+    var avaColObject = _.filter(tableColumns, function (col) {
+      return tableMetrics.indexOf(col.name) != -1;
+    });
+    return avaColObject;
+  };
+
+  $scope.getColumnType = function (_column, table) {
+    var columns = $scope.getColumnsByTable(table);
+    var type;
+    angular.forEach(columns, function (column) {
+      if (_column === column.name) {
+        type = column.datatype;
+        return;
+      }
+    });
+    return type;
+  };
+
+  var ColFamily = function () {
+    var index = 1;
+    return function () {
+      var newColFamily =
+      {
+        "name": "f" + index,
+        "columns": [
+          {
+            "qualifier": "m",
+            "measure_refs": []
+          }
+        ]
+      };
+      index += 1;
 
-    $scope.getColumnType = function (_column,table){
-        var columns = $scope.getColumnsByTable(table);
-        var type;
-        angular.forEach(columns,function(column){
-            if(_column === column.name){
-                type = column.datatype;
-                return;
-            }
-        });
-        return type;
-    };
+      return newColFamily;
+    }
+  };
 
-    var ColFamily = function () {
-        var index = 1;
-        return function () {
-            var newColFamily =
-            {
-                "name": "f" + index,
-                "columns": [
-                    {
-                        "qualifier": "m",
-                        "measure_refs": []
-                    }
-                ]
-            };
-            index += 1;
-
-            return  newColFamily;
+
+  // ~ Define data
+  $scope.state = {
+    "cubeSchema": "",
+    "mode": 'edit'
+  };
+
+  $scope.cubeState={
+    "isStreaming": false
+  }
+
+  //fetch cube info and model info in edit model
+  // ~ init
+  if ($scope.isEdit = !!$routeParams.cubeName) {
+    CubeDescService.query({cube_name: $routeParams.cubeName}, function (detail) {
+      if (detail.length > 0) {
+        $scope.cubeMetaFrame = detail[0];
+        $scope.metaModel = {};
+
+        //get model from API when page refresh
+        if (!modelsManager.getModels().length) {
+          ModelDescService.query({model_name: $scope.cubeMetaFrame.model_name}, function (_model) {
+            $scope.metaModel.model = _model;
+          });
         }
-    };
+        $scope.metaModel.model = modelsManager.getModel($scope.cubeMetaFrame.model_name);
 
+        $scope.state.cubeSchema = angular.toJson($scope.cubeMetaFrame, true);
 
-    // ~ Define data
-    $scope.state = {
-        "cubeSchema": "",
-        "mode":'edit'
-    };
+        StreamingService.getConfig({cubeName:$scope.cubeMetaFrame.name}, function (kfkConfigs) {
 
-    //fetch cube info and model info in edit model
-    // ~ init
-    if ($scope.isEdit = !!$routeParams.cubeName) {
-        CubeDescService.query({cube_name: $routeParams.cubeName}, function (detail) {
-            if (detail.length > 0) {
-                $scope.cubeMetaFrame = detail[0];
-                $scope.metaModel = {};
-
-                //get model from API when page refresh
-                if(!modelsManager.getModels().length){
-                    ModelDescService.query({model_name: $scope.cubeMetaFrame.model_name}, function (_model) {
-                        $scope.metaModel.model = _model;
-                    });
-                }
-                $scope.metaModel.model=modelsManager.getModel($scope.cubeMetaFrame.model_name);
-
-                $scope.state.cubeSchema = angular.toJson($scope.cubeMetaFrame, true);
-            }
-        });
+          if(!!kfkConfigs[0]){
+            $scope.cubeState.isStreaming = true;
+          }
+          $scope.streamingMeta = kfkConfigs[0];
+          StreamingService.getKfkConfig({kafkaConfigName:$scope.streamingMeta.name}, function (streamings) {
+            $scope.kafkaMeta = streamings[0];
+          })
+        })
+      }
+    });
 
-    } else {
+
+
+  } else {
 //        $scope.cubeMetaFrame = CubeDescModel.createNew();
-        $scope.cubeMetaFrame = CubeDescModel.createNew();
-        $scope.metaModel ={
-            model : modelsManager.getModel($scope.cubeMetaFrame.model_name)
-        }
-        //$scope.cubeMetaFrame.model_name = modelName;
-        $scope.state.cubeSchema = angular.toJson($scope.cubeMetaFrame, true);
+    $scope.cubeMetaFrame = CubeDescModel.createNew();
+    $scope.metaModel = {
+      model: modelsManager.getModel($scope.cubeMetaFrame.model_name)
     }
+    //$scope.cubeMetaFrame.model_name = modelName;
+    $scope.state.cubeSchema = angular.toJson($scope.cubeMetaFrame, true);
 
+    $scope.streamingMeta = StreamingModel.createStreamingConfig();
+    $scope.kafkaMeta = StreamingModel.createKafkaConfig();
 
-    $scope.prepareCube = function () {
-        // generate column family
-        generateColumnFamily();
-        //generate rowkey TODO remove after refactor
-        reGenerateRowKey();
+  }
 
 
-        if ($scope.metaModel.model.partition_desc.partition_date_column&&($scope.metaModel.model.partition_desc.partition_date_start|$scope.metaModel.model.partition_desc.partition_date_start==0)) {
-            var dateStart = new Date($scope.metaModel.model.partition_desc.partition_date_start);
-            dateStart = (dateStart.getFullYear() + "-" + (dateStart.getMonth() + 1) + "-" + dateStart.getDate());
-            //switch selected time to utc timestamp
-            $scope.metaModel.model.partition_desc.partition_date_start = new Date(moment.utc(dateStart, "YYYY-MM-DD").format()).getTime();
+  $scope.prepareCube = function () {
+    //generate column family
+    generateColumnFamily();
+    //generate rowkey
+    reGenerateRowKey();
 
+    if ($scope.metaModel.model.partition_desc.partition_date_column && ($scope.metaModel.model.partition_desc.partition_date_start | $scope.metaModel.model.partition_desc.partition_date_start == 0)) {
+      var dateStart = new Date($scope.metaModel.model.partition_desc.partition_date_start);
+      dateStart = (dateStart.getFullYear() + "-" + (dateStart.getMonth() + 1) + "-" + dateStart.getDate());
+      //switch selected time to utc timestamp
+      $scope.metaModel.model.partition_desc.partition_date_start = new Date(moment.utc(dateStart, "YYYY-MM-DD").format()).getTime();
 
-            if($scope.metaModel.model.partition_desc.partition_date_column.indexOf(".")==-1){
-            $scope.metaModel.model.partition_desc.partition_date_column=$scope.metaModel.model.fact_table+"."+$scope.metaModel.model.partition_desc.partition_date_column;
-            }
 
-        }
-        //use cubedesc name as model name
-        if($scope.metaModel.model.name===""||angular.isUndefined($scope.metaModel.model.name)){
-            $scope.metaModel.model.name = $scope.cubeMetaFrame.name;
-        }
+      if ($scope.metaModel.model.partition_desc.partition_date_column.indexOf(".") == -1) {
+        $scope.metaModel.model.partition_desc.partition_date_column = $scope.metaModel.model.fact_table + "." + $scope.metaModel.model.partition_desc.partition_date_column;
+      }
 
-        //set model ref for cubeDesc
-        if($scope.cubeMetaFrame.model_name===""||angular.isUndefined($scope.cubeMetaFrame.model_name)){
-            $scope.cubeMetaFrame.model_name = $scope.cubeMetaFrame.name;
-        }
+    }
+    //use cubedesc name as model name
+    if ($scope.metaModel.model.name === "" || angular.isUndefined($scope.metaModel.model.name)) {
+      $scope.metaModel.model.name = $scope.cubeMetaFrame.name;
+    }
 
-        $scope.state.project = ProjectModel.getSelectedProject();
+    //set model ref for cubeDesc
+    if ($scope.cubeMetaFrame.model_name === "" || angular.isUndefined($scope.cubeMetaFrame.model_name)) {
+      $scope.cubeMetaFrame.model_name = $scope.cubeMetaFrame.name;
+    }
+
+    $scope.state.project = ProjectModel.getSelectedProject();
 //        delete $scope.cubeMetaFrame.project;
-        $scope.state.cubeSchema = angular.toJson($scope.cubeMetaFrame, true);
-    };
+    $scope.state.cubeSchema = angular.toJson($scope.cubeMetaFrame, true);
 
-    $scope.cubeResultTmpl = function (notification) {
-        // Get the static notification template.
-        var tmpl = notification.type == 'success' ? 'cubeResultSuccess.html' : 'cubeResultError.html';
-        return $interpolate($templateCache.get(tmpl))(notification);
-    };
+    //streaming meta
+    if($scope.cubeState.isStreaming == true){
+      $scope.streamingMeta.cubeName = $scope.cubeMetaFrame.name;
+      $scope.streamingMeta.name = $scope.cubeMetaFrame.name+STREAMING_SUFFIX;
+      $scope.kafkaMeta.name = $scope.cubeMetaFrame.name+STREAMING_SUFFIX;
+    }
 
-    $scope.saveCube = function () {
 
-        try {
-            angular.fromJson($scope.state.cubeSchema);
-        } catch (e) {
-            SweetAlert.swal('Oops...', 'Invalid cube json format..', 'error');
-            return;
-        }
 
-        SweetAlert.swal({
-            title: '',
-            text: 'Are you sure to save the cube ?',
-            type: '',
-            showCancelButton: true,
-            confirmButtonColor: '#DD6B55',
-            confirmButtonText: "Yes",
-            closeOnConfirm: true
-        }, function(isConfirm) {
-            if(isConfirm){
-                loadingRequest.show();
-
-                if ($scope.isEdit) {
-                    CubeService.update({}, {cubeDescData: $scope.state.cubeSchema, cubeName: $routeParams.cubeName, project: $scope.state.project}, function (request) {
-                        if (request.successful) {
-                            $scope.state.cubeSchema = request.cubeDescData;
-                            SweetAlert.swal('', 'Updated the cube successfully.', 'success');
-                            $location.path("/models");
-                        } else {
-                            $scope.saveCubeRollBack();
-                            $scope.cubeMetaFrame.project = $scope.state.project;
-                                var message =request.message;
-                                var msg = !!(message) ? message : 'Failed to take action.';
-                                MessageService.sendMsg($scope.cubeResultTmpl({'text':msg,'schema':$scope.state.cubeSchema}), 'error', {}, true, 'top_center');
-                        }
-                        //end loading
-                        loadingRequest.hide();
-                    }, function (e) {
-                        $scope.saveCubeRollBack();
-
-                        if(e.data&& e.data.exception){
-                            var message =e.data.exception;
-                            var msg = !!(message) ? message : 'Failed to take action.';
-                            MessageService.sendMsg($scope.cubeResultTmpl({'text':msg,'schema':$scope.state.cubeSchema}), 'error', {}, true, 'top_center');
-                        } else {
-                            MessageService.sendMsg($scope.cubeResultTmpl({'text':'Failed to take action.','schema':$scope.state.cubeSchema}), 'error', {}, true, 'top_center');
-                        }
-                        loadingRequest.hide();
-                    });
-                } else {
-                    CubeService.save({}, {cubeDescData: $scope.state.cubeSchema, project: $scope.state.project}, function (request) {
-                        if(request.successful) {
-                            $scope.state.cubeSchema = request.cubeDescData;
-
-                            SweetAlert.swal('', 'Created the cube successfully.', 'success');
-                            $location.path("/models");
-                            //location.reload();
-
-                        } else {
-                            $scope.saveCubeRollBack();
-                            $scope.cubeMetaFrame.project = $scope.state.project;
-                            var message =request.message;
-                            var msg = !!(message) ? message : 'Failed to take action.';
-                            MessageService.sendMsg($scope.cubeResultTmpl({'text':msg,'schema':$scope.state.cubeSchema}), 'error', {}, true, 'top_center');
-                        }
-
-                        //end loading
-                        loadingRequest.hide();
-                    }, function (e) {
-                        $scope.saveCubeRollBack();
-
-                        if (e.data && e.data.exception) {
-                            var message =e.data.exception;
-                            var msg = !!(message) ? message : 'Failed to take action.';
-                            MessageService.sendMsg($scope.cubeResultTmpl({'text':msg,'schema':$scope.state.cubeSchema}), 'error', {}, true, 'top_center');
-                        } else {
-                            MessageService.sendMsg($scope.cubeResultTmpl({'text':"Failed to take action.",'schema':$scope.state.cubeSchema}), 'error', {}, true, 'top_center');
-                        }
-                        //end loading
-                        loadingRequest.hide();
-
-                    });
-                }
-            }
-            else{
-                $scope.saveCubeRollBack();
-            }
-        });
-    };
+  };
 
-//    reverse the date
-    $scope.saveCubeRollBack = function (){
-        if($scope.metaModel.model&&($scope.metaModel.model.partition_desc.partition_date_start||$scope.metaModel.model.partition_desc.partition_date_start==0))
-        {
-            $scope.metaModel.model.partition_desc.partition_date_start+=new Date().getTimezoneOffset()*60000;
-        }
+  $scope.cubeResultTmpl = function (notification) {
+    // Get the static notification template.
+    var tmpl = notification.type == 'success' ? 'cubeResultSuccess.html' : 'cubeResultError.html';
+    return $interpolate($templateCache.get(tmpl))(notification);
+  };
+
+  $scope.saveCube = function () {
+
+    try {
+      angular.fromJson($scope.state.cubeSchema);
+    } catch (e) {
+      SweetAlert.swal('Oops...', 'Invalid cube json format..', 'error');
+      return;
     }
 
-    $scope.updateMandatory = function(rowkey_column){
-        if(!rowkey_column.mandatory){
-            angular.forEach($scope.cubeMetaFrame.rowkey.aggregation_groups, function (group, index) {
-                   var index = group.indexOf(rowkey_column.column);
-                   if(index>-1){
-                       group.splice(index,1);
-                   }
-            });
-        }
+    if (!$scope.cubeState.isStreaming) {
+      $scope.state.streamingCube = false;
+    } else {
+      $scope.state.streamingCube = true;
+      $scope.state.streamingMeta = angular.toJson($scope.streamingMeta, true);
+      $scope.state.kafkaMeta = angular.toJson($scope.kafkaMeta, true);
     }
 
-    function reGenerateRowKey(){
-        $log.log("reGen rowkey & agg group");
-        var fk_pk = {};
-        var tmpRowKeyColumns = [];
-        var tmpAggregationItems = [];//put all aggregation item
-        var hierarchyItemArray = [];//put all hierarchy items
-        angular.forEach($scope.cubeMetaFrame.dimensions, function (dimension, index) {
-
-          // build fk_pk map
-          angular.forEach($scope.metaModel.model.lookups, function (_lookup, index) {
-            for (var i = 0; i < _lookup.join.foreign_key.length; i++) {
-              fk_pk[_lookup.join.primary_key[i]] = _lookup.join.foreign_key[i];
+
+    SweetAlert.swal({
+      title: '',
+      text: 'Are you sure to save the cube ?',
+      type: '',
+      showCancelButton: true,
+      confirmButtonColor: '#DD6B55',
+      confirmButtonText: "Yes",
+      closeOnConfirm: true
+    }, function (isConfirm) {
+      if (isConfirm) {
+        loadingRequest.show();
+
+        if ($scope.isEdit) {
+          CubeService.update({}, {
+            cubeDescData: $scope.state.cubeSchema,
+            cubeName: $routeParams.cubeName,
+            project: $scope.state.project,
+            streamingCube: $scope.state.streamingCube,
+            streamingData: $scope.state.streamingMeta,
+            kafkaData: $scope.state.kafkaMeta
+          }, function (request) {
+            if (request.successful) {
+              $scope.state.cubeSchema = request.cubeDescData;
+              SweetAlert.swal('', 'Updated the cube successfully.', 'success');
+              $location.path("/models");
+            } else {
+              $scope.saveCubeRollBack();
+              $scope.cubeMetaFrame.project = $scope.state.project;
+              var message = request.message;
+              var msg = !!(message) ? message : 'Failed to take action.';
+              MessageService.sendMsg($scope.cubeResultTmpl({
+                'text': msg,
+                'schema': $scope.state.cubeSchema
+              }), 'error', {}, true, 'top_center');
+            }
+            //end loading
+            loadingRequest.hide();
+          }, function (e) {
+            $scope.saveCubeRollBack();
+
+            if (e.data && e.data.exception) {
+              var message = e.data.exception;
+              var msg = !!(message) ? message : 'Failed to take action.';
+              MessageService.sendMsg($scope.cubeResultTmpl({
+                'text': msg,
+                'schema': $scope.state.cubeSchema
+              }), 'error', {}, true, 'top_center');
+            } else {
+              MessageService.sendMsg($scope.cubeResultTmpl({
+                'text': 'Failed to take action.',
+                'schema': $scope.state.cubeSchema
+              }), 'error', {}, true, 'top_center');
             }
+            loadingRequest.hide();
           });
+        } else {
+          CubeService.save({}, {
+            cubeDescData: $scope.state.cubeSchema,
+            project: $scope.state.project,
+            streamingCube: $scope.state.streamingCube,
+            streamingData: $scope.state.streamingMeta,
+            kafkaData: $scope.state.kafkaMeta
+          }, function (request) {
+            if (request.successful) {
+              $scope.state.cubeSchema = request.cubeDescData;
+              SweetAlert.swal('', 'Created the cube successfully.', 'success');
+              $location.path("/models");
+              //location.reload();
+
+            } else {
+              $scope.saveCubeRollBack();
+              $scope.cubeMetaFrame.project = $scope.state.project;
+              var message = request.message;
+              var msg = !!(message) ? message : 'Failed to take action.';
+              MessageService.sendMsg($scope.cubeResultTmpl({
+                'text': msg,
+                'schema': $scope.state.cubeSchema
+              }), 'error', {}, true, 'top_center');
+            }
 
-           //derived column
-            if(dimension.derived&&dimension.derived.length){
-                var lookup = _.find($scope.metaModel.model.lookups,function(lookup){return lookup.table==dimension.table});
-                angular.forEach(lookup.join.foreign_key, function (fk, index) {
-                    for (var i = 0; i < tmpRowKeyColumns.length; i++) {
-                        if(tmpRowKeyColumns[i].column == fk)
-                            break;
-                    }
-                    // push to array if no duplicate value
-                    if(i == tmpRowKeyColumns.length) {
-                        tmpRowKeyColumns.push({
-                            "column": fk,
-                            "length": 0,
-                            "dictionary": "true",
-                            "mandatory": false
-                        });
-
-                        tmpAggregationItems.push(fk);
-                    }
-                })
-
+            //end loading
+            loadingRequest.hide();
+          }, function (e) {
+            $scope.saveCubeRollBack();
+
+            if (e.data && e.data.exception) {
+              var message = e.data.exception;
+              var msg = !!(message) ? message : 'Failed to take action.';
+              MessageService.sendMsg($scope.cubeResultTmpl({
+                'text': msg,
+                'schema': $scope.state.cubeSchema
+              }), 'error', {}, true, 'top_center');
+            } else {
+              MessageService.sendMsg($scope.cubeResultTmpl({
+                'text': "Failed to take action.",
+                'schema': $scope.state.cubeSchema
+              }), 'error', {}, true, 'top_center');
             }
-            //normal column
-            else if (dimension.column&&!dimension.hierarchy&&dimension.column.length==1) {
-                for (var i = 0; i < tmpRowKeyColumns.length; i++) {
-                    if(tmpRowKeyColumns[i].column == dimension.column[0])
-                        break;
-                }
-                if(i == tmpRowKeyColumns.length) {
-                    tmpRowKeyColumns.push({
-                        "column": dimension.column[0],
-                        "length": 0,
-                        "dictionary": "true",
-                        "mandatory": false
-                    });
-                    tmpAggregationItems.push(dimension.column[0]);
-                }
+            //end loading
+            loadingRequest.hide();
+
+          });
+        }
+      }
+      else {
+        $scope.saveCubeRollBack();
+      }
+    });
+  };
+
+  //save streaming
+  $scope.postData = {};
+  $scope.saveStreaming = function () {
+    $scope.kafkaMeta.name = $scope.streamingMeta.name;
+
+    $scope.postData.streamingMeta = angular.toJson($scope.streamingMeta, true);
+    $scope.postData.kafkaMeta = angular.toJson($scope.kafkaMeta, true);
+    SweetAlert.swal({
+      title: '',
+      text: 'Are you sure to save Streaming ?',
+      type: '',
+      showCancelButton: true,
+      confirmButtonColor: '#DD6B55',
+      confirmButtonText: "Yes",
+      closeOnConfirm: true
+    }, function (isConfirm) {
+      if (isConfirm) {
+        loadingRequest.show();
+
+        if ($scope.modelMode == "editExistStreaming") {
+          StreamingService.update({}, {
+            project: $scope.projectModel.selectedProject,
+            streamingConfig: $scope.postData.streamingMeta,
+            kafkaConfig: $scope.postData.kafkaMeta
+          }, function (request) {
+            if (request.successful) {
+              SweetAlert.swal('', 'Updated the streaming successfully.', 'success');
+              $location.path("/models");
+            } else {
+              var message = request.message;
+              var msg = !!(message) ? message : 'Failed to take action.';
+              MessageService.sendMsg($scope.streamingResultTmpl({
+                'text': msg,
+                'streamingSchema': $scope.postData.streamingMeta,
+                'kfkSchema': $scope.postData.kafkaMeta
+              }), 'error', {}, true, 'top_center');
             }
-            // hierarchy
-            if(dimension.hierarchy && dimension.column.length){
-                var hierarchyUnit = [];
-                angular.forEach(dimension.column, function (hier_column, index) {
-
-                  //use fk instead of fk as rowkey and aggregation item in hierarchy
-                  if (hier_column in fk_pk) {
-                    hier_column = fk_pk[hier_column];
-                  }
-
-                    for (var i = 0; i < tmpRowKeyColumns.length; i++) {
-                        if(tmpRowKeyColumns[i].column == hier_column)
-                            break;
-                    }
-                    if(i == tmpRowKeyColumns.length) {
-                        tmpRowKeyColumns.push({
-                            "column": hier_column,
-                            "length": 0,
-                            "dictionary": "true",
-                            "mandatory": false
-                        });
-                        tmpAggregationItems.push(hier_column);
-                    }
-                    if(hierarchyUnit.indexOf(hier_column)==-1){
-                      hierarchyUnit.push(hier_column);
-                    }
-                });
-              if(hierarchyUnit.length){
-                hierarchyItemArray.push(hierarchyUnit);
-              }
+            loadingRequest.hide();
+          })
+        } else {
+          StreamingService.save({}, {
+            project: $scope.projectModel.selectedProject,
+            streamingConfig: $scope.postData.streamingMeta,
+            kafkaConfig: $scope.postData.kafkaMeta
+          }, function (request) {
+            if (request.successful) {
+              SweetAlert.swal('', 'Created the streaming successfully.', 'success');
+              $location.path("/models");
+            } else {
+              var message = request.message;
+              var msg = !!(message) ? message : 'Failed to take action.';
+              MessageService.sendMsg($scope.streamingResultTmpl({
+                'text': msg,
+                'streamingSchema': $scope.postData.streamingMeta,
+                'kfkSchema': $scope.postData.kafkaMeta
+              }), 'error', {}, true, 'top_center');
             }
+            loadingRequest.hide();
+          })
+        }
 
-        });
+      }
+    });
 
 
-        //rm mandatory column from aggregation item
-        angular.forEach($scope.cubeMetaFrame.rowkey.rowkey_columns,function(value,index){
-                if(value.mandatory){
-                    tmpAggregationItems = _.filter(tmpAggregationItems,function(item){
-                           return item!=value.column;
-                    });
-                }
-        });
+  }
 
-        var rowkeyColumns = $scope.cubeMetaFrame.rowkey.rowkey_columns;
-        var newRowKeyColumns = sortSharedData(rowkeyColumns,tmpRowKeyColumns);
-        var increasedColumns = increasedColumn(rowkeyColumns,tmpRowKeyColumns);
-        newRowKeyColumns = newRowKeyColumns.concat(increasedColumns);
-
-        //! here get the latest rowkey_columns
-        $scope.cubeMetaFrame.rowkey.rowkey_columns = newRowKeyColumns;
-
-        if($scope.cubeMode==="editExistCube") {
-            var aggregationGroups = $scope.cubeMetaFrame.rowkey.aggregation_groups;
-            // rm unused item from group,will only rm when [edit] dimension
-            angular.forEach(aggregationGroups, function (group, index) {
-                if (group) {
-                    for (var j = 0; j < group.length; j++) {
-                        var elemStillExist = false;
-                        for (var k = 0; k < tmpAggregationItems.length; k++) {
-                            if (group[j] == tmpAggregationItems[k]) {
-                                elemStillExist = true;
-                                break;
-                            }
-                        }
-                        if (!elemStillExist) {
-                            group.splice(j, 1);
-                            j--;
-                        }
-                    }
-                    if (!group.length) {
-                        aggregationGroups.splice(index, 1);
-                        index--;
-                    }
-                }
-                else {
-                    aggregationGroups.splice(index, 1);
-                    index--;
-                }
-            });
+
+//    reverse the date
+  $scope.saveCubeRollBack = function () {
+    if ($scope.metaModel.model && ($scope.metaModel.model.partition_desc.partition_date_start || $scope.metaModel.model.partition_desc.partition_date_start == 0)) {
+      $scope.metaModel.model.partition_desc.partition_date_start += new Date().getTimezoneOffset() * 60000;
+    }
+  }
+
+  $scope.updateMandatory = function (rowkey_column) {
+    if (!rowkey_column.mandatory) {
+      angular.forEach($scope.cubeMetaFrame.rowkey.aggregation_groups, function (group, index) {
+        var index = group.indexOf(rowkey_column.column);
+        if (index > -1) {
+          group.splice(index, 1);
         }
+      });
+    }
+  }
 
-        if($scope.cubeMode==="addNewCube"){
+  function reGenerateRowKey() {
+    $log.log("reGen rowkey & agg group");
+    var fk_pk = {};
+    var tmpRowKeyColumns = [];
+    var tmpAggregationItems = [];//put all aggregation item
+    var hierarchyItemArray = [];//put all hierarchy items
+    angular.forEach($scope.cubeMetaFrame.dimensions, function (dimension, index) {
+
+      // build fk_pk map
+      angular.forEach($scope.metaModel.model.lookups, function (_lookup, index) {
+        for (var i = 0; i < _lookup.join.foreign_key.length; i++) {
+          fk_pk[_lookup.join.primary_key[i]] = _lookup.join.foreign_key[i];
+        }
+      });
 
-          if(!tmpAggregationItems.length) {
-              $scope.cubeMetaFrame.rowkey.aggregation_groups=[];
-              return;
+      //derived column
+      if (dimension.derived && dimension.derived.length) {
+        var lookup = _.find($scope.metaModel.model.lookups, function (lookup) {
+          return lookup.table == dimension.table
+        });
+        angular.forEach(lookup.join.foreign_key, function (fk, index) {
+          for (var i = 0; i < tmpRowKeyColumns.length; i++) {
+            if (tmpRowKeyColumns[i].column == fk)
+              break;
           }
-
-            var newUniqAggregationItem = [];
-            angular.forEach(tmpAggregationItems, function (item, index) {
-                if(newUniqAggregationItem.indexOf(item)==-1){
-                    newUniqAggregationItem.push(item);
-                }
+          // push to array if no duplicate value
+          if (i == tmpRowKeyColumns.length) {
+            tmpRowKeyColumns.push({
+              "column": fk,
+              "length": 0,
+              "dictionary": "true",
+              "mandatory": false
             });
 
-          //distinct hierarchyItem
-          var hierarchyItems = hierarchyItemArray.join().split(",");
-          var _hierarchyItems = [];
-          angular.forEach(hierarchyItems, function (item, index) {
-            if (_hierarchyItems.indexOf(item) == -1) {
-              _hierarchyItems.push(item);
-            }
+            tmpAggregationItems.push(fk);
+          }
+        })
+
+      }
+      //normal column
+      else if (dimension.column && !dimension.hierarchy && dimension.column.length == 1) {
+        for (var i = 0; i < tmpRowKeyColumns.length; i++) {
+          if (tmpRowKeyColumns[i].column == dimension.column[0])
+            break;
+        }
+        if (i == tmpRowKeyColumns.length) {
+          tmpRowKeyColumns.push({
+            "column": dimension.column[0],
+            "length": 0,
+            "dictionary": "true",
+            "mandatory": false
           });
-          hierarchyItems = _hierarchyItems;
-
-          var unHierarchyItems = increasedData(hierarchyItems,newUniqAggregationItem);
-          //hierarchyItems
-          var increasedDataGroups = sliceGroupItemToGroups(unHierarchyItems);
-          if(!hierarchyItemArray.length){
-              $scope.cubeMetaFrame.rowkey.aggregation_groups = increasedDataGroups;
-              return;
-          };
-
-          var lastAggregationGroup = increasedDataGroups.length===0?[]:increasedDataGroups[increasedDataGroups.length-1];
-
-          if(lastAggregationGroup.length<10){
-            if(lastAggregationGroup.length+hierarchyItemArray.length<=10){
-              lastAggregationGroup = lastAggregationGroup.concat(hierarchyItems);
-              if(increasedDataGroups.length==0){
-                //case only hierarchy
-                increasedDataGroups[0]=lastAggregationGroup;
-              }else{
-                increasedDataGroups[increasedDataGroups.length-1]=lastAggregationGroup;
+          tmpAggregationItems.push(dimension.column[0]);
+        }
+      }
+      // hierarchy
+      if (dimension.hierarchy && dimension.column.length) {
+        var hierarchyUnit = [];
+        angular.forEach(dimension.column, function (hier_column, index) {
+
+          //use fk instead of fk as rowkey and aggregation item in hierarchy
+          if (hier_column in fk_pk) {
+            hier_column = fk_pk[hier_column];
+          }
+
+          for (var i = 0; i < tmpRowKeyColumns.length; i++) {
+            if (tmpRowKeyColumns[i].column == hier_column)
+              break;
+          }
+          if (i == tmpRowKeyColumns.length) {
+            tmpRowKeyColumns.push({
+              "column": hier_column,
+              "length": 0,
+              "dictionary": "true",
+              "mandatory": false
+            });
+            tmpAggregationItems.push(hier_column);
+          }
+          if (hierarchyUnit.indexOf(hier_column) == -1) {
+            hierarchyUnit.push(hier_column);
+          }
+        });
+        if (hierarchyUnit.length) {
+          hierarchyItemArray.push(hierarchyUnit);
+        }
+      }
+
+    });
+
+
+    //rm mandatory column from aggregation item
+    angular.forEach($scope.cubeMetaFrame.rowkey.rowkey_columns, function (value, index) {
+      if (value.mandatory) {
+        tmpAggregationItems = _.filter(tmpAggregationItems, function (item) {
+          return item != value.column;
+        });
+      }
+    });
+
+    var rowkeyColumns = $scope.cubeMetaFrame.rowkey.rowkey_columns;
+    var newRowKeyColumns = sortSharedData(rowkeyColumns, tmpRowKeyColumns);
+    var increasedColumns = increasedColumn(rowkeyColumns, tmpRowKeyColumns);
+    newRowKeyColumns = newRowKeyColumns.concat(increasedColumns);
+
+    //! here get the latest rowkey_columns
+    $scope.cubeMetaFrame.rowkey.rowkey_columns = newRowKeyColumns;
+
+    if ($scope.cubeMode === "editExistCube") {
+      var aggregationGroups = $scope.cubeMetaFrame.rowkey.aggregation_groups;
+      // rm unused item from group,will only rm when [edit] dimension
+      angular.forEach(aggregationGroups, function (group, index) {
+        if (group) {
+          for (var j = 0; j < group.length; j++) {
+            var elemStillExist = false;
+            for (var k = 0; k < tmpAggregationItems.length; k++) {
+              if (group[j] == tmpAggregationItems[k]) {
+                elemStillExist = true;
+                break;
               }
             }
-            else{
-                var cutIndex = 10-lastAggregationGroup.length;
-                var partialHierarchy =hierarchyItemArray.slice(0,cutIndex).join().split(",");
-                //add hierarchy to last group and make sure length less than 10
-                lastAggregationGroup = lastAggregationGroup.concat(partialHierarchy);
-                increasedDataGroups[increasedDataGroups.length-1]=lastAggregationGroup;
-                var leftHierarchy = hierarchyItemArray.slice(cutIndex);
-
-                var leftHierarchyLength = leftHierarchy.length;
-                var grpLength = parseInt(leftHierarchyLength/10);
-                if(leftHierarchyLength%10==0&&leftHierarchyLength!=0){
-                    grpLength--;
-                }
-                for(var i=0;i<=grpLength;i++){
-                    var hierAggGroupUnit = leftHierarchy.slice(i*10,(i+1)*10).join().split(",");
-                    increasedDataGroups.push(hierAggGroupUnit);
-                }
+            if (!elemStillExist) {
+              group.splice(j, 1);
+              j--;
             }
           }
-          //lastAggregationGroup length >=10
-          else{
-              var hierrachyArrayLength = hierarchyItemArray.length;
-              var grpLength = parseInt(hierrachyArrayLength/10);
-              if(hierrachyArrayLength%10==0&&hierrachyArrayLength!=0){
-                  grpLength--;
-              }
-              for(var i=0;i<=grpLength;i++){
-                   var hierAggGroupUnit = hierarchyItemArray.slice(i*10,(i+1)*10).join().split(",");
-                   increasedDataGroups.push(hierAggGroupUnit);
-              }
+          if (!group.length) {
+            aggregationGroups.splice(index, 1);
+            index--;
           }
-            //! here get the latest aggregation groups,only effect when add newCube
-            $scope.cubeMetaFrame.rowkey.aggregation_groups = increasedDataGroups;
         }
-    }
-
-    function sortSharedData(oldArray,tmpArr){
-        var newArr = [];
-        for(var j=0;j<oldArray.length;j++){
-            var unit = oldArray[j];
-            for(var k=0;k<tmpArr.length;k++){
-                if(unit.column==tmpArr[k].column){
-                    newArr.push(unit);
-                }
-            }
+        else {
+          aggregationGroups.splice(index, 1);
+          index--;
         }
-        return newArr;
+      });
     }
 
-    function increasedData(oldArray,tmpArr){
-        var increasedData = [];
-        if(oldArray&&!oldArray.length){
-            return   increasedData.concat(tmpArr);
-        }
+    if ($scope.cubeMode === "addNewCube") {
 
-        for(var j=0;j<tmpArr.length;j++){
-            var unit = tmpArr[j];
-            var exist = false;
-            for(var k=0;k<oldArray.length;k++){
-                if(unit==oldArray[k]){
-                    exist = true;
-                    break;
-                }
-            }
-            if(!exist){
-                increasedData.push(unit);
-            }
+      if (!tmpAggregationItems.length) {
+        $scope.cubeMetaFrame.rowkey.aggregation_groups = [];
+        return;
+      }
+
+      var newUniqAggregationItem = [];
+      angular.forEach(tmpAggregationItems, function (item, index) {
+        if (newUniqAggregationItem.indexOf(item) == -1) {
+          newUniqAggregationItem.push(item);
+        }
+      });
+
+      //distinct hierarchyItem
+      var hierarchyItems = hierarchyItemArray.join().split(",");
+      var _hierarchyItems = [];
+      angular.forEach(hierarchyItems, function (item, index) {
+        if (_hierarchyItems.indexOf(item) == -1) {
+          _hierarchyItems.push(item);
+        }
+      });
+      hierarchyItems = _hierarchyItems;
+
+      var unHierarchyItems = increasedData(hierarchyItems, newUniqAggregationItem);
+      //hierarchyItems
+      var increasedDataGroups = sliceGroupItemToGroups(unHierarchyItems);
+      if (!hierarchyItemArray.length) {
+        $scope.cubeMetaFrame.rowkey.aggregation_groups = increasedDataGroups;
+        return;
+      }
+      ;
+
+      var lastAggregationGroup = increasedDataGroups.length === 0 ? [] : increasedDataGroups[increasedDataGroups.length - 1];
+
+      if (lastAggregationGroup.length < 10) {
+        if (lastAggregationGroup.length + hierarchyItemArray.length <= 10) {
+          lastAggregationGroup = lastAggregationGroup.concat(hierarchyItems);
+          if (increasedDataGroups.length == 0) {
+            //case only hierarchy
+            increasedDataGroups[0] = lastAggregationGroup;
+          } else {
+            increasedDataGroups[increasedDataGroups.length - 1] = lastAggregationGroup;
+          }
+        }
+        else {
+          var cutIndex = 10 - lastAggregationGroup.length;
+          var partialHierarchy = hierarchyItemArray.slice(0, cutIndex).join().split(",");
+          //add hierarchy to last group and make sure length less than 10
+          lastAggregationGroup = lastAggregationGroup.concat(partialHierarchy);
+          increasedDataGroups[increasedDataGroups.length - 1] = lastAggregationGroup;
+          var leftHierarchy = hierarchyItemArray.slice(cutIndex);
+
+          var leftHierarchyLength = leftHierarchy.length;
+          var grpLength = parseInt(leftHierarchyLength / 10);
+          if (leftHierarchyLength % 10 == 0 && leftHierarchyLength != 0) {
+            grpLength--;
+          }
+          for (var i = 0; i <= grpLength; i++) {
+            var hierAggGroupUnit = leftHierarchy.slice(i * 10, (i + 1) * 10).join().split(",");
+            increasedDataGroups.push(hierAggGroupUnit);
+          }
+        }
+      }
+      //lastAggregationGroup length >=10
+      else {
+        var hierrachyArrayLength = hierarchyItemArray.length;
+        var grpLength = parseInt(hierrachyArrayLength / 10);
+        if (hierrachyArrayLength % 10 == 0 && hierrachyArrayLength != 0) {
+          grpLength--;
         }
-        return increasedData;
+        for (var i = 0; i <= grpLength; i++) {
+          var hierAggGroupUnit = hierarchyItemArray.slice(i * 10, (i + 1) * 10).join().split(",");
+          increasedDataGroups.push(hierAggGroupUnit);
+        }
+      }
+      //! here get the latest aggregation groups,only effect when add newCube
+      $scope.cubeMetaFrame.rowkey.aggregation_groups = increasedDataGroups;
     }
+  }
 
-    function increasedColumn(oldArray,tmpArr){
-        var increasedData = [];
-        if(oldArray&&!oldArray.length){
-         return   increasedData.concat(tmpArr);
+  function sortSharedData(oldArray, tmpArr) {
+    var newArr = [];
+    for (var j = 0; j < oldArray.length; j++) {
+      var unit = oldArray[j];
+      for (var k = 0; k < tmpArr.length; k++) {
+        if (unit.column == tmpArr[k].column) {
+          newArr.push(unit);
         }
+      }
+    }
+    return newArr;
+  }
 
-        for(var j=0;j<tmpArr.length;j++){
-            var unit = tmpArr[j];
-            var exist = false;
-            for(var k=0;k<oldArray.length;k++){
-                if(unit.column==oldArray[k].column){
-                    exist = true;
-                    break;
-                }
-            }
-            if(!exist){
-                increasedData.push(unit);
-            }
-        }
-        return increasedData;
+  function increasedData(oldArray, tmpArr) {
+    var increasedData = [];
+    if (oldArray && !oldArray.length) {
+      return increasedData.concat(tmpArr);
     }
 
-    function sliceGroupItemToGroups(groupItems){
-        if(!groupItems.length){
-            return [];
+    for (var j = 0; j < tmpArr.length; j++) {
+      var unit = tmpArr[j];
+      var exist = false;
+      for (var k = 0; k < oldArray.length; k++) {
+        if (unit == oldArray[k]) {
+          exist = true;
+          break;
         }
-        var groups = [];
-        var j = -1;
-        for(var i = 0;i<groupItems.length;i++){
-            if(i%10==0){
-                j++;
-                groups[j]=[];
-            }
-            groups[j].push(groupItems[i]);
-        }
-        return groups;
+      }
+      if (!exist) {
+        increasedData.push(unit);
+      }
     }
+    return increasedData;
+  }
 
+  function increasedColumn(oldArray, tmpArr) {
+    var increasedData = [];
+    if (oldArray && !oldArray.length) {
+      return increasedData.concat(tmpArr);
+    }
 
-    // ~ private methods
-    function generateColumnFamily() {
-        $scope.cubeMetaFrame.hbase_mapping.column_family = [];
-        var colFamily = ColFamily();
-        var normalMeasures = [], distinctCountMeasures=[];
-        angular.forEach($scope.cubeMetaFrame.measures, function (measure, index) {
-            if(measure.function.expression === 'COUNT_DISTINCT'){
-                distinctCountMeasures.push(measure);
-            }else{
-                normalMeasures.push(measure);
-            }
-        });
-        if(normalMeasures.length>0){
-            var nmcf = colFamily();
-            angular.forEach(normalMeasures, function(normalM, index){
-                nmcf.columns[0].measure_refs.push(normalM.name);
-            });
-            $scope.cubeMetaFrame.hbase_mapping.column_family.push(nmcf);
+    for (var j = 0; j < tmpArr.length; j++) {
+      var unit = tmpArr[j];
+      var exist = false;
+      for (var k = 0; k < oldArray.length; k++) {
+        if (unit.column == oldArray[k].column) {
+          exist = true;
+          break;
         }
+      }
+      if (!exist) {
+        increasedData.push(unit);
+      }
+    }
+    return increasedData;
+  }
 
-        if (distinctCountMeasures.length > 0){
-            var dccf = colFamily();
-            angular.forEach(distinctCountMeasures, function(dcm, index){
-                dccf.columns[0].measure_refs.push(dcm.name);
-            });
-            $scope.cubeMetaFrame.hbase_mapping.column_family.push(dccf);
-        }
+  function sliceGroupItemToGroups(groupItems) {
+    if (!groupItems.length) {
+      return [];
     }
+    var groups = [];
+    var j = -1;
+    for (var i = 0; i < groupItems.length; i++) {
+      if (i % 10 == 0) {
+        j++;
+        groups[j] = [];
+      }
+      groups[j].push(groupItems[i]);
+    }
+    return groups;
+  }
 
-    $scope.$watch('projectModel.selectedProject', function (newValue, oldValue) {
-        if(!newValue){
-            return;
-        }
-        var param = {
-            ext: true,
-            project:newValue
-        };
-        if(newValue){
-            TableModel.initTables();
-            TableService.list(param, function (tables) {
-                angular.forEach(tables, function (table) {
-                    table.name = table.database+"."+table.name;
-                    TableModel.addTable(table);
-                });
-            });
-        }
+
+  // ~ private methods
+  function generateColumnFamily() {
+    $scope.cubeMetaFrame.hbase_mapping.column_family = [];
+    var colFamily = ColFamily();
+    var normalMeasures = [], distinctCountMeasures = [];
+    angular.forEach($scope.cubeMetaFrame.measures, function (measure, index) {
+      if (measure.function.expression === 'COUNT_DISTINCT') {
+        distinctCountMeasures.push(measure);
+      } else {
+        normalMeasures.push(measure);
+      }
     });
+    if (normalMeasures.length > 0) {
+      var nmcf = colFamily();
+      angular.forEach(normalMeasures, function (normalM, index) {
+        nmcf.columns[0].measure_refs.push(normalM.name);
+      });
+      $scope.cubeMetaFrame.hbase_mapping.column_family.push(nmcf);
+    }
+
+    if (distinctCountMeasures.length > 0) {
+      var dccf = colFamily();
+      angular.forEach(distinctCountMeasures, function (dcm, index) {
+        dccf.columns[0].measure_refs.push(dcm.name);
+      });
+      $scope.cubeMetaFrame.hbase_mapping.column_family.push(dccf);
+    }
+  }
+
+  $scope.$watch('projectModel.selectedProject', function (newValue, oldValue) {
+    if (!newValue) {
+      return;
+    }
+    var param = {
+      ext: true,
+      project: newValue
+    };
+    if (newValue) {
+      TableModel.initTables();
+      TableService.list(param, function (tables) {
+        angular.forEach(tables, function (table) {
+          table.name = table.database + "." + table.name;
+          TableModel.addTable(table);
+        });
+      });
+    }
+  });
 
 
   //dimensions options is depend on the model input when add cube
   $scope.$watch('cubeMetaFrame.model_name', function (newValue, oldValue) {
-    if(!newValue){
+    if (!newValue) {
       return;
     }
-    $scope.metaModel.model=modelsManager.getModel(newValue);
+    $scope.metaModel.model = modelsManager.getModel(newValue);
+
+    if(!$scope.metaModel.model){
+      return;
+    }
+    var factTable = $scope.metaModel.model.fact_table;
+    var cols = $scope.getColumnsByTable(factTable);
+    //
+    for(var i=0;i<cols.length;i++){
+      var col = cols[i];
+      if(col.datatype === "timestamp"&&col.name.indexOf("_TS")==-1){
+        $scope.kafkaMeta.parserProperties = "tsColName="+col.name+";formatTs=TRUE";
+        break;
+      }
+    }
+
+  });
+  $scope.$on('DimensionsEdited', function (event) {
+    if ($scope.cubeMetaFrame) {
+      reGenerateRowKey();
+    }
   });
-    $scope.$on('DimensionsEdited', function (event) {
-        if ($scope.cubeMetaFrame) {
-            reGenerateRowKey();
-        }
-    });
 });

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/controllers/cubeFilter.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeFilter.js b/webapp/app/js/controllers/cubeFilter.js
deleted file mode 100644
index 3953169..0000000
--- a/webapp/app/js/controllers/cubeFilter.js
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-'use strict';
-
-KylinApp.controller('CubeFilterCtrl', function ($scope, $modal,cubeConfig,MetaModel) {
-
-});

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/controllers/cubeMeasures.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeMeasures.js b/webapp/app/js/controllers/cubeMeasures.js
new file mode 100644
index 0000000..cdcd3cf
--- /dev/null
+++ b/webapp/app/js/controllers/cubeMeasures.js
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/license$s/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+'use strict';
+
+KylinApp.controller('CubeMeasuresCtrl', function ($scope, $modal,MetaModel,cubesManager,CubeDescModel) {
+
+  $scope.addNewMeasure = function (measure) {
+    $scope.newMeasure = (!!measure)? measure:CubeDescModel.createMeasure();
+  };
+
+  $scope.removeElement = function (arr, element) {
+    var index = arr.indexOf(element);
+    if (index > -1) {
+      arr.splice(index, 1);
+    }
+  };
+
+  $scope.clearNewMeasure = function () {
+    $scope.newMeasure = null;
+  };
+
+  $scope.saveNewMeasure = function () {
+    if ($scope.cubeMetaFrame.measures.indexOf($scope.newMeasure) === -1) {
+      $scope.cubeMetaFrame.measures.push($scope.newMeasure);
+    }
+    $scope.newMeasure = null;
+  };
+
+  //map right return type for param
+  $scope.measureReturnTypeUpdate = function(){
+    if($scope.newMeasure.function.parameter.type=="constant"&&$scope.newMeasure.function.expression!=="COUNT_DISTINCT"){
+      switch($scope.newMeasure.function.expression){
+        case "SUM":
+        case "COUNT":
+          $scope.newMeasure.function.returntype = "bigint";
+          break;
+        default:
+          $scope.newMeasure.function.returntype = "";
+          break;
+      }
+    }
+    if($scope.newMeasure.function.parameter.type=="column"&&$scope.newMeasure.function.expression!=="COUNT_DISTINCT"){
+
+      var column = $scope.newMeasure.function.parameter.value;
+      var colType = $scope.getColumnType(column, $scope.metaModel.model.fact_table); // $scope.getColumnType defined in cubeEdit.js
+
+      if(colType==""||!colType){
+        $scope.newMeasure.function.returntype = "";
+        return;
+      }
+
+
+      switch($scope.newMeasure.function.expression){
+        case "SUM":
+          if(colType==="smallint"||colType==="int"||colType==="bigint"||colType==="integer"){
+            $scope.newMeasure.function.returntype= 'bigint';
+          }else{
+            if(colType.indexOf('decimal')!=-1){
+              $scope.newMeasure.function.returntype= colType;
+            }else{
+              $scope.newMeasure.function.returntype= 'decimal';
+            }
+          }
+          break;
+        case "MIN":
+        case "MAX":
+          $scope.newMeasure.function.returntype = colType;
+          break;
+        case "COUNT":
+          $scope.newMeasure.function.returntype = "bigint";
+          break;
+        default:
+          $scope.newMeasure.function.returntype = "";
+          break;
+      }
+    }
+  }
+
+
+});

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/controllers/cubeSchema.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeSchema.js b/webapp/app/js/controllers/cubeSchema.js
index d94316e..b714c41 100755
--- a/webapp/app/js/controllers/cubeSchema.js
+++ b/webapp/app/js/controllers/cubeSchema.js
@@ -18,8 +18,7 @@
 
 'use strict';
 
-KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserService,modelsManager, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel,CubeList,TableModel,ProjectModel,ModelDescService,SweetAlert,cubesManager) {
-
+KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserService,modelsManager, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel,CubeList,TableModel,ProjectModel,ModelDescService,SweetAlert,cubesManager,StreamingService) {
     $scope.modelsManager = modelsManager;
     $scope.cubesManager = cubesManager;
     $scope.projects = [];
@@ -34,6 +33,7 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
     if (UserService.hasRole("ROLE_ADMIN")) {
             $scope.wizardSteps.push({title: 'Advanced Setting', src: 'partials/cubeDesigner/advanced_settings.html', isComplete: false,form:'cube_setting_form'});
     }
+    $scope.wizardSteps.push({title: 'Streaming', src: 'partials/cubeDesigner/streamingConfig.html', isComplete: false,form:'cube_streaming_form'});
     $scope.wizardSteps.push({title: 'Overview', src: 'partials/cubeDesigner/overview.html', isComplete: false,form:null});
 
     $scope.curStep = $scope.wizardSteps[0];
@@ -60,89 +60,6 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
         return $scope.userService.hasRole('ROLE_ADMIN') || $scope.hasPermission(project,$scope.permissions.ADMINISTRATION.mask);
     };
 
-    $scope.addNewMeasure = function (measure) {
-        $scope.newMeasure = (!!measure)? measure:CubeDescModel.createMeasure();
-    };
-
-    $scope.clearNewMeasure = function () {
-        $scope.newMeasure = null;
-    };
-
-    $scope.saveNewMeasure = function () {
-        if ($scope.cubeMetaFrame.measures.indexOf($scope.newMeasure) === -1) {
-            $scope.cubeMetaFrame.measures.push($scope.newMeasure);
-        }
-        $scope.newMeasure = null;
-    };
-
-    //map right return type for param
-    $scope.measureReturnTypeUpdate = function(){
-        if($scope.newMeasure.function.parameter.type=="constant"&&$scope.newMeasure.function.expression!=="COUNT_DISTINCT"){
-            switch($scope.newMeasure.function.expression){
-                case "SUM":
-                case "COUNT":
-                    $scope.newMeasure.function.returntype = "bigint";
-                    break;
-                default:
-                    $scope.newMeasure.function.returntype = "";
-                    break;
-            }
-        }
-        if($scope.newMeasure.function.parameter.type=="column"&&$scope.newMeasure.function.expression!=="COUNT_DISTINCT"){
-
-            var column = $scope.newMeasure.function.parameter.value;
-            var colType = $scope.getColumnType(column, $scope.metaModel.model.fact_table); // $scope.getColumnType defined in cubeEdit.js
-
-            if(colType==""||!colType){
-                $scope.newMeasure.function.returntype = "";
-                return;
-            }
-
-
-            switch($scope.newMeasure.function.expression){
-                case "SUM":
-                    if(colType==="smallint"||colType==="int"||colType==="bigint"||colType==="integer"){
-                        $scope.newMeasure.function.returntype= 'bigint';
-                    }else{
-                        if(colType.indexOf('decimal')!=-1){
-                            $scope.newMeasure.function.returntype= colType;
-                        }else{
-                            $scope.newMeasure.function.returntype= 'decimal';
-                        }
-                    }
-                    break;
-                case "MIN":
-                case "MAX":
-                    $scope.newMeasure.function.returntype = colType;
-                    break;
-                case "COUNT":
-                    $scope.newMeasure.function.returntype = "bigint";
-                    break;
-                default:
-                    $scope.newMeasure.function.returntype = "";
-                    break;
-            }
-        }
-    }
-
-    $scope.addNewRowkeyColumn = function () {
-        $scope.cubeMetaFrame.rowkey.rowkey_columns.push({
-            "column": "",
-            "length": 0,
-            "dictionary": "true",
-            "mandatory": false
-        });
-    };
-
-    $scope.addNewAggregationGroup = function () {
-        $scope.cubeMetaFrame.rowkey.aggregation_groups.push([]);
-    };
-
-    $scope.refreshAggregationGroup = function (list, index, aggregation_groups) {
-        if (aggregation_groups) {
-            list[index] = aggregation_groups;
-        }
-    };
 
     $scope.removeElement = function (arr, element) {
         var index = arr.indexOf(element);
@@ -214,6 +131,16 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
       }
       $scope.metaModel.model=modelsManager.getModel($scope.cubeMetaFrame.model_name);
 
+      StreamingService.getConfig({cubeName:$scope.cubeMetaFrame.name}, function (kfkConfigs) {
+        if(!!kfkConfigs[0]&&kfkConfigs[0].cubeName == $scope.cubeMetaFrame.name){
+          $scope.cubeState.isStreaming = true;
+          $scope.streamingMeta = kfkConfigs[0];
+          StreamingService.getKfkConfig({kafkaConfigName:$scope.streamingMeta.name}, function (streamings) {
+            $scope.kafkaMeta = streamings[0];
+          })
+        }
+      })
+
     }
   });
 
@@ -242,6 +169,9 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
             }else{
                 //business rule check
                 switch($scope.curStep.form){
+                    case 'cube_info_form':
+                      return $scope.check_cube_info();
+                      break;
                     case 'cube_dimension_form':
                         return $scope.check_cube_dimension();
                         break;
@@ -250,6 +180,8 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
                         break;
                     case 'cube_setting_form':
                         return $scope.check_cube_setting();
+                    case 'cube_streaming_form':
+                        return $scope.kafka_ad_config_form();
                     default:
                         return true;
                         break;
@@ -258,6 +190,9 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
         }
     };
 
+  $scope.check_cube_info = function(){
+
+  }
 
     $scope.check_cube_dimension = function(){
         var errors = [];
@@ -327,6 +262,31 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
         }
     }
 
+  $scope.kafka_ad_config_form = function(){
+    if(!$scope.cubeState.isStreaming){
+      return true;
+    }
+    var errors = [];
+    if(!$scope.kafkaMeta.clusters.length){
+      errors.push("Cluster can't be null");
+    }
+    angular.forEach($scope.kafkaMeta.clusters,function(cluster,index){
+      if(!cluster.brokers.length){
+        errors.push("No broker under Cluster-"+(index+1));
+      }
+    })
+    var errorInfo = "";
+    angular.forEach(errors,function(item){
+      errorInfo+="\n"+item;
+    });
+    if(errors.length){
+      SweetAlert.swal('', errorInfo, 'warning');
+      return false;
+    }else{
+      return true;
+    }
+  }
+
 
     // ~ private methods
     function initProject() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/controllers/cubes.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubes.js b/webapp/app/js/controllers/cubes.js
index 886aa6b..843ad7a 100644
--- a/webapp/app/js/controllers/cubes.js
+++ b/webapp/app/js/controllers/cubes.js
@@ -19,7 +19,7 @@
 'use strict';
 
 KylinApp
-  .controller('CubesCtrl', function ($scope, $q, $routeParams, $location, $modal, MessageService, CubeDescService, CubeService, JobService, UserService, ProjectService, SweetAlert, loadingRequest, $log, cubeConfig, ProjectModel, ModelService, MetaModel, CubeList,modelsManager,cubesManager) {
+  .controller('CubesCtrl', function ($scope, $q, $routeParams, $location, $modal, MessageService, CubeDescService, CubeService, JobService, UserService, ProjectService, SweetAlert, loadingRequest, $log, cubeConfig, ProjectModel, ModelService, MetaModel, CubeList,modelsManager,cubesManager,StreamingList) {
 
     $scope.cubeConfig = cubeConfig;
     $scope.cubeList = CubeList;
@@ -62,9 +62,21 @@ KylinApp
       $scope.loading = true;
 
       return CubeList.list(queryParam).then(function (resp) {
-        $scope.loading = false;
-        defer.resolve(resp);
-        return defer.promise;
+
+        StreamingList.list().then(function(_resp){
+          angular.forEach($scope.cubeList.cubes,function(item,index){
+            var result = StreamingList.checkCubeExist(item.name);
+            if(result.exist == true){
+              item.streaming = result.streaming;
+              var kfkConfig = StreamingList.getKafkaConfig(result.streaming.name);
+              item.kfkConfig = kfkConfig;
+            }
+          })
+          $scope.loading = false;
+          defer.resolve(resp);
+          return defer.promise;
+        })
+
       },function(resp){
         $scope.loading = false;
         defer.resolve([]);
@@ -355,7 +367,6 @@ KylinApp
     $scope.cubeEdit = function (cube) {
       $location.path("cubes/edit/" + cube.name);
     }
-
     $scope.startMerge = function (cube) {
       $scope.metaModel={
         model:modelsManager.getModelByCube(cube.name)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/controllers/modelSchema.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/modelSchema.js b/webapp/app/js/controllers/modelSchema.js
index a2126e8..ccb0013 100644
--- a/webapp/app/js/controllers/modelSchema.js
+++ b/webapp/app/js/controllers/modelSchema.js
@@ -14,235 +14,278 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 'use strict';
 
-KylinApp.controller('ModelSchemaCtrl', function ($scope, QueryService, UserService, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel,CubeList,TableModel,ProjectModel,$log,SweetAlert,modelsManager) {
-
-    $scope.modelsManager = modelsManager;
-
-    $scope.projects = [];
-    $scope.newDimension = null;
-    $scope.newMeasure = null;
-
-    $scope.forms = {};
-
-
-    $scope.wizardSteps = [
-        {title: 'Model Info', src: 'partials/modelDesigner/model_info.html', isComplete: false,form:'model_info_form'},
-        {title: 'Data Model', src: 'partials/modelDesigner/data_model.html', isComplete: false,form:'data_model_form'},
-        {title: 'Dimensions', src: 'partials/modelDesigner/model_dimensions.html', isComplete: false,form:'model_dimensions_form'},
-        {title: 'Measures', src: 'partials/modelDesigner/model_measures.html', isComplete: false,form:'model_measure_form'},
-        {title: 'Settings', src: 'partials/modelDesigner/conditions_settings.html', isComplete: false,form:'model_setting_form'}
-    ];
+KylinApp.controller('ModelSchemaCtrl', function ($scope, QueryService, UserService, ProjectService, $q, AuthenticationService, $filter, ModelService, MetaModel, CubeDescModel, CubeList, TableModel, ProjectModel, $log, SweetAlert, modelsManager) {
+
+  $scope.modelsManager = modelsManager;
+  $scope.projectModel = ProjectModel;
+  $scope.projects = [];
+  $scope.newDimension = null;
+  $scope.newMeasure = null;
+
+  $scope.forms = {};
+
+
+  $scope.wizardSteps = [
+    {title: 'Model Info', src: 'partials/modelDesigner/model_info.html', isComplete: false, form: 'model_info_form'},
+    {title: 'Data Model', src: 'partials/modelDesigner/data_model.html', isComplete: false, form: 'data_model_form'},
+    {
+      title: 'Dimensions',
+      src: 'partials/modelDesigner/model_dimensions.html',
+      isComplete: false,
+      form: 'model_dimensions_form'
+    },
+    {
+      title: 'Measures',
+      src: 'partials/modelDesigner/model_measures.html',
+      isComplete: false,
+      form: 'model_measure_form'
+    },
+    {
+      title: 'Settings',
+      src: 'partials/modelDesigner/conditions_settings.html',
+      isComplete: false,
+      form: 'model_setting_form'
+    }
+  ];
 
-    $scope.curStep = $scope.wizardSteps[0];
+  $scope.curStep = $scope.wizardSteps[0];
 
 
-    // ~ init
-    if (!$scope.state) {
-        $scope.state = {mode: "view"};
+  // ~ init
+  if (!$scope.state) {
+    $scope.state = {mode: "view"};
+  }
+  //init modelsManager
+  if ($scope.state.mode == "edit") {
+    var defer = $q.defer();
+    var queryParam = {};
+    if (!$scope.projectModel.isSelectedProjectValid()) {
+      return;
     }
+    queryParam.projectName = $scope.projectModel.selectedProject;
+    modelsManager.list(queryParam).then(function (resp) {
+      defer.resolve(resp);
+      modelsManager.loading = false;
+      return defer.promise;
+    });
+  }
 
-    $scope.$watch('model', function (newValue, oldValue) {
-        if(!newValue){
-            return;
-        }
-        if ($scope.modelMode=="editExistModel"&&newValue && !newValue.project) {
-            initProject();
-        }
+  $scope.$watch('model', function (newValue, oldValue) {
+    if (!newValue) {
+      return;
+    }
+    if ($scope.modelMode == "editExistModel" && newValue && !newValue.project) {
+      initProject();
+    }
 
-    });
+  });
 
-    // ~ public methods
-    $scope.filterProj = function(project){
-        return $scope.userService.hasRole('ROLE_ADMIN') || $scope.hasPermission(project,$scope.permissions.ADMINISTRATION.mask);
-    };
+  // ~ public methods
+  $scope.filterProj = function (project) {
+    return $scope.userService.hasRole('ROLE_ADMIN') || $scope.hasPermission(project, $scope.permissions.ADMINISTRATION.mask);
+  };
 
-    $scope.removeElement = function (arr, element) {
-        var index = arr.indexOf(element);
-        if (index > -1) {
-            arr.splice(index, 1);
-        }
-    };
+  $scope.removeElement = function (arr, element) {
+    var index = arr.indexOf(element);
+    if (index > -1) {
+      arr.splice(index, 1);
+    }
+  };
 
-    $scope.open = function ($event) {
-        $event.preventDefault();
-        $event.stopPropagation();
+  $scope.open = function ($event) {
+    $event.preventDefault();
+    $event.stopPropagation();
 
-        $scope.opened = true;
-    };
+    $scope.opened = true;
+  };
 
-    $scope.preView = function () {
-        var stepIndex = $scope.wizardSteps.indexOf($scope.curStep);
-        if (stepIndex >= 1) {
-            $scope.curStep.isComplete = false;
-            $scope.curStep = $scope.wizardSteps[stepIndex - 1];
-        }
-    };
+  $scope.preView = function () {
+    var stepIndex = $scope.wizardSteps.indexOf($scope.curStep);
+    if (stepIndex >= 1) {
+      $scope.curStep.isComplete = false;
+      $scope.curStep = $scope.wizardSteps[stepIndex - 1];
+    }
+  };
 
-    $scope.nextView = function () {
-        var stepIndex = $scope.wizardSteps.indexOf($scope.curStep);
+  $scope.nextView = function () {
+    var stepIndex = $scope.wizardSteps.indexOf($scope.curStep);
 
-        if (stepIndex < ($scope.wizardSteps.length - 1)) {
-            $scope.curStep.isComplete = true;
-            $scope.curStep = $scope.wizardSteps[stepIndex + 1];
+    if (stepIndex < ($scope.wizardSteps.length - 1)) {
+      $scope.curStep.isComplete = true;
+      $scope.curStep = $scope.wizardSteps[stepIndex + 1];
 
-            AuthenticationService.ping(function (data) {
-                UserService.setCurUser(data);
-            });
-        }
-    };
+      AuthenticationService.ping(function (data) {
+        UserService.setCurUser(data);
+      });
+    }
+  };
 
-    $scope.checkForm = function(){
-        if(!$scope.curStep.form){
-            return true;
-        }
-        if($scope.state.mode==='view'){
+  $scope.checkForm = function () {
+    if (!$scope.curStep.form) {
+      return true;
+    }
+    if ($scope.state.mode === 'view') {
+      return true;
+    }
+    else {
+      //form validation
+      if ($scope.forms[$scope.curStep.form].$invalid) {
+        $scope.forms[$scope.curStep.form].$sbumitted = true;
+        return false;
+      } else {
+        //business rule check
+        switch ($scope.curStep.form) {
+          case 'data_model_form':
+            return $scope.check_data_model();
+            break;
+          case 'model_info_form':
+            return $scope.check_model_info();
+            break;
+          case 'model_dimensions_form':
+            return $scope.check_model_dimensions();
+            break;
+          case 'model_measure_form':
+            return $scope.check_model_measure();
+            break;
+          case 'model_setting_form':
+            return $scope.check_model_setting();
+            break;
+          default:
             return true;
+            break;
         }
-        else{
-            //form validation
-            if($scope.forms[$scope.curStep.form].$invalid){
-                $scope.forms[$scope.curStep.form].$sbumitted=true;
-                return false;
-            }else{
-                //business rule check
-                switch($scope.curStep.form){
-                    case 'data_model_form':
-                        return $scope.check_data_model();
-                         break;
-                    case 'model_dimensions_form':
-                        return $scope.check_model_dimensions();
-                         break;
-                    case 'model_measure_form':
-                        return $scope.check_model_measure();
-                         break;
-                    case 'model_setting_form':
-                        return $scope.check_model_setting();
-                         break;
-                    default:
-                        return true;
-                        break;
-                }
-            }
-        }
-    };
+      }
+    }
+  };
+
+  $scope.check_model_info = function () {
 
-    /*
-     * lookups can't be null
-     */
-    $scope.check_data_model = function(){
-        var errors = [];
+    var modelName = $scope.modelsManager.selectedModel.name.toUpperCase();
+    var models = $scope.modelsManager.modelNameList;
+    if (models.indexOf(modelName) != -1) {
+      SweetAlert.swal('', "Model named [" + modelName + "] already exist!", 'warning');
+      return false;
+    }
+    return true;
+  }
+
+  /*
+   * lookups can't be null
+   */
+  $scope.check_data_model = function () {
+    var errors = [];
 //        if(!modelsManager.selectedModel.lookups.length){
 //            errors.push("No lookup table defined");
 //        }
-        var errorInfo = "";
-        angular.forEach(errors,function(item){
-            errorInfo+="\n"+item;
-        });
-        if(errors.length){
-            SweetAlert.swal('', errorInfo, 'warning');
-            return false;
-        }else{
-            return true;
-        }
-
+    var errorInfo = "";
+    angular.forEach(errors, function (item) {
+      errorInfo += "\n" + item;
+    });
+    if (errors.length) {
+      SweetAlert.swal('', errorInfo, 'warning');
+      return false;
+    } else {
+      return true;
     }
 
-    /*
-    * dimensions validation
-    * 1.dimension can't be null
-    */
-    $scope.check_model_dimensions = function(){
+  }
 
-        var errors = [];
-        if(!modelsManager.selectedModel.dimensions.length){
-            errors.push("No dimensions defined.");
-        }
-        angular.forEach(modelsManager.selectedModel.dimensions,function(_dimension){
-            if(!_dimension.columns||!_dimension.columns.length){
-            errors.push("No dimension columns defined for Table["+_dimension.table+"]");
-            }
-        });
-        var errorInfo = "";
-        angular.forEach(errors,function(item){
-            errorInfo+="\n"+item;
-        });
-        if(errors.length){
-            SweetAlert.swal('Oops...', errorInfo, 'warning');
-            return false;
-        }else{
-            return true;
-        }
+  /*
+   * dimensions validation
+   * 1.dimension can't be null
+   */
+  $scope.check_model_dimensions = function () {
 
-    };
+    var errors = [];
+    if (!modelsManager.selectedModel.dimensions.length) {
+      errors.push("No dimensions defined.");
+    }
+    angular.forEach(modelsManager.selectedModel.dimensions, function (_dimension) {
+      if (!_dimension.columns || !_dimension.columns.length) {
+        errors.push("No dimension columns defined for Table[" + _dimension.table + "]");
+      }
+    });
+    var errorInfo = "";
+    angular.forEach(errors, function (item) {
+      errorInfo += "\n" + item;
+    });
+    if (errors.length) {
+      SweetAlert.swal('Oops...', errorInfo, 'warning');
+      return false;
+    } else {
+      return true;
+    }
 
-    /*
-     * dimensions validation
-     * 1.metric can't be null
-     */
-    $scope.check_model_measure = function(){
+  };
 
-        var errors = [];
-        if(!modelsManager.selectedModel.metrics||!modelsManager.selectedModel.metrics.length){
-            errors.push("Please define your metrics.");
-        }
-        var errorInfo = "";
-        angular.forEach(errors,function(item){
-            errorInfo+="\n"+item;
-        });
-        if(errors.length){
-            SweetAlert.swal('', errorInfo, 'warning');
-            return false;
-        }else{
-            return true;
-        }
+  /*
+   * dimensions validation
+   * 1.metric can't be null
+   */
+  $scope.check_model_measure = function () {
 
-    };
-    $scope.check_model_setting = function(){
-        var errors = [];
-        if(modelsManager.selectedModel.partition_desc.partition_date_column!=null&& modelsManager.selectedModel.partition_desc.partition_date_start==null){
-            errors.push("Please indicate start date when partition date column selected.");
-        }
-        var errorInfo = "";
-        angular.forEach(errors,function(item){
-            errorInfo+="\n"+item;
-        });
-        if(errors.length){
-            SweetAlert.swal('', errorInfo, 'warning');
-            return false;
-        }else{
-            return true;
-        }
+    var errors = [];
+    if (!modelsManager.selectedModel.metrics || !modelsManager.selectedModel.metrics.length) {
+      errors.push("Please define your metrics.");
+    }
+    var errorInfo = "";
+    angular.forEach(errors, function (item) {
+      errorInfo += "\n" + item;
+    });
+    if (errors.length) {
+      SweetAlert.swal('', errorInfo, 'warning');
+      return false;
+    } else {
+      return true;
     }
 
+  };
+  $scope.check_model_setting = function () {
+    var errors = [];
+    if (modelsManager.selectedModel.partition_desc.partition_date_column != null && modelsManager.selectedModel.partition_desc.partition_date_start == null) {
+      errors.push("Please indicate start date when partition date column selected.");
+    }
+    var errorInfo = "";
+    angular.forEach(errors, function (item) {
+      errorInfo += "\n" + item;
+    });
+    if (errors.length) {
+      SweetAlert.swal('', errorInfo, 'warning');
+      return false;
+    } else {
+      return true;
+    }
+  }
 
-    $scope.goToStep = function(stepIndex){
-        if($scope.state.mode=="edit"){
-            if(stepIndex+1>=$scope.curStep.step){
-                return;
-            }
-        }
-        for(var i=0;i<$scope.wizardSteps.length;i++){
-            if(i<=stepIndex){
-                $scope.wizardSteps[i].isComplete = true;
-            }else{
-                $scope.wizardSteps[i].isComplete = false;
-            }
-        }
-        if (stepIndex < ($scope.wizardSteps.length)) {
-            $scope.curStep = $scope.wizardSteps[stepIndex];
 
-            AuthenticationService.ping(function (data) {
-                UserService.setCurUser(data);
-            });
-        }
+  $scope.goToStep = function (stepIndex) {
+    if ($scope.state.mode == "edit") {
+      if (stepIndex + 1 >= $scope.curStep.step) {
+        return;
+      }
     }
+    for (var i = 0; i < $scope.wizardSteps.length; i++) {
+      if (i <= stepIndex) {
+        $scope.wizardSteps[i].isComplete = true;
+      } else {
+        $scope.wizardSteps[i].isComplete = false;
+      }
+    }
+    if (stepIndex < ($scope.wizardSteps.length)) {
+      $scope.curStep = $scope.wizardSteps[stepIndex];
 
-    // ~ private methods
-    function initProject() {
-
+      AuthenticationService.ping(function (data) {
+        UserService.setCurUser(data);
+      });
     }
+  }
+
+  // ~ private methods
+  function initProject() {
+
+  }
 });

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/controllers/sourceMeta.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index 9d405dc..1bbe9c8 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -19,7 +19,7 @@
 'use strict';
 
 KylinApp
-  .controller('SourceMetaCtrl', function ($scope, $cacheFactory, $q, $window, $routeParams, CubeService, $modal, TableService, $route, loadingRequest, SweetAlert, tableConfig, TableModel) {
+  .controller('SourceMetaCtrl', function ($scope, $cacheFactory, $q, $window, $routeParams, CubeService, $modal, TableService, $route, loadingRequest, SweetAlert, tableConfig, TableModel,cubeConfig) {
     var $httpDefaultCache = $cacheFactory.get('$http');
     $scope.tableModel = TableModel;
     $scope.tableModel.selectedSrcDb = [];
@@ -173,10 +173,11 @@ KylinApp
       });
     };
 
-    var StreamingSourceCtrl = function ($scope, $location, $modalInstance, tableNames, MessageService, projectName, scope, tableConfig) {
+    var StreamingSourceCtrl = function ($scope, $location, $modalInstance, tableNames, MessageService, projectName, scope, tableConfig,cubeConfig) {
       $scope.streamingPrefix = "STREAMING_";
       $scope.projectName = projectName;
       $scope.tableConfig = tableConfig;
+      $scope.cubeConfig = cubeConfig;
       $scope.streaming = {
         sourceSchema: '',
         'parseResult': {}
@@ -242,12 +243,9 @@ KylinApp
           columnList = _.sortBy(columnList, function (i) { return i.type; });
         }
 
-          var timeMeasure = ['year_start','month_start','day_start','hour_start','min_start'];
+          var timeMeasure = $scope.cubeConfig.streamingAutoGenerateMeasure;
           for(var i = 0;i<timeMeasure.length;i++){
             var defaultCheck = 'Y';
-            if(timeMeasure[i]=='min_start'){
-              defaultCheck = 'N';
-            }
             columnList.push({
               'name': timeMeasure[i],
               'checked': defaultCheck,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/controllers/streamingConfig.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/streamingConfig.js b/webapp/app/js/controllers/streamingConfig.js
new file mode 100644
index 0000000..53b30e0
--- /dev/null
+++ b/webapp/app/js/controllers/streamingConfig.js
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+KylinApp.controller('streamingConfigCtrl', function ($scope, $q, $routeParams, $location, $window, $modal, MessageService, CubeDescService, CubeService, JobService, UserService, ProjectService, SweetAlert, loadingRequest, $log, modelConfig, ProjectModel, ModelService, MetaModel, modelsManager, cubesManager, TableModel, $animate,StreamingService,StreamingModel) {
+
+  $scope.addCluster = function () {
+    $scope.kafkaMeta.clusters.push(StreamingModel.createKafkaCluster());
+  };
+
+  $scope.removeCluster = function(cluster){
+
+    SweetAlert.swal({
+      title: '',
+      text: 'Are you sure to remove this cluster ?',
+      type: '',
+      showCancelButton: true,
+      confirmButtonColor: '#DD6B55',
+      confirmButtonText: "Yes",
+      closeOnConfirm: true
+    }, function(isConfirm) {
+      if(isConfirm) {
+        var index = $scope.kafkaMeta.clusters.indexOf(cluster);
+        if (index > -1) {
+          $scope.kafkaMeta.clusters.splice(index, 1);
+        }
+      }
+
+    })
+  }
+
+  $scope.addBroker = function (cluster,broker) {
+    //$scope.modelsManager.selectedModel = model;
+    cluster.newBroker=(!!broker)?broker:StreamingModel.createBrokerConfig();
+  };
+
+  $scope.removeNewBroker = function (cluster){
+    delete cluster.newBroker;
+  }
+
+  $scope.removeElement = function (cluster, element) {
+    var index = cluster.brokers.indexOf(element);
+    if (index > -1) {
+      cluster.brokers.splice(index, 1);
+    }
+  };
+
+  $scope.saveNewBroker = function(cluster){
+    if (cluster.brokers.indexOf(cluster.newBroker) === -1) {
+      cluster.brokers.push(cluster.newBroker);
+    }
+    delete cluster.newBroker;
+  }
+
+  $scope.clearNewBroker = function(cluster){
+    delete cluster.newBroker;
+  }
+
+});

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/controllers/streamingKafkaConfig.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/streamingKafkaConfig.js b/webapp/app/js/controllers/streamingKafkaConfig.js
new file mode 100644
index 0000000..b995aed
--- /dev/null
+++ b/webapp/app/js/controllers/streamingKafkaConfig.js
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'use strict';
+
+KylinApp.controller('kafkaConfigCtrl', function ($scope, $q, $routeParams, $location, $window, $modal, MessageService, CubeDescService, CubeService, JobService, UserService, ProjectService, SweetAlert, loadingRequest, $log, modelConfig, ProjectModel, ModelService, MetaModel, modelsManager, cubesManager, TableModel, $animate,StreamingService) {
+
+});

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/model/cubeConfig.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/cubeConfig.js b/webapp/app/js/model/cubeConfig.js
index 93db978..3d80c7f 100644
--- a/webapp/app/js/model/cubeConfig.js
+++ b/webapp/app/js/model/cubeConfig.js
@@ -70,5 +70,6 @@ KylinApp.constant('cubeConfig', {
     {attr: 'last_build_time', name: 'Last Build Time'},
     {attr: 'owner', name: 'Owner'},
     {attr: 'create_time', name: 'Create Time'}
-  ]
+  ],
+  streamingAutoGenerateMeasure:['year_start_ts','month_start_ts','day_start_ts','hour_start_ts','min_start_ts']
 });



[5/5] incubator-kylin git commit: KYLIN-966, validate cube exists or not when create (patch from @nichunen)

Posted by zh...@apache.org.
KYLIN-966, validate cube exists or not when create (patch from @nichunen)


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/aeac6337
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/aeac6337
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/aeac6337

Branch: refs/heads/2.x-staging
Commit: aeac6337b208426c6a6b84760c19503bdbe62cbf
Parents: 53b383d
Author: jiazhong <ji...@ebay.com>
Authored: Thu Oct 22 18:05:12 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Thu Oct 22 18:06:02 2015 +0800

----------------------------------------------------------------------
 webapp/app/js/controllers/cubeSchema.js | 21 +++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/aeac6337/webapp/app/js/controllers/cubeSchema.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeSchema.js b/webapp/app/js/controllers/cubeSchema.js
index b714c41..6d215ce 100755
--- a/webapp/app/js/controllers/cubeSchema.js
+++ b/webapp/app/js/controllers/cubeSchema.js
@@ -18,7 +18,7 @@
 
 'use strict';
 
-KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserService,modelsManager, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel,CubeList,TableModel,ProjectModel,ModelDescService,SweetAlert,cubesManager,StreamingService) {
+KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserService,modelsManager, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel,CubeList,TableModel,ProjectModel,ModelDescService,SweetAlert,cubesManager,StreamingService,CubeService) {
     $scope.modelsManager = modelsManager;
     $scope.cubesManager = cubesManager;
     $scope.projects = [];
@@ -38,6 +38,7 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
 
     $scope.curStep = $scope.wizardSteps[0];
 
+    $scope.allCubes = [];
 
 
   // ~ init
@@ -45,6 +46,18 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
         $scope.state = {mode: "view"};
     }
 
+  var queryParam = {offset: 0, limit: 65535};
+
+  CubeService.list(queryParam, function (all_cubes) {
+    if($scope.allCubes.length > 0){
+      $scope.allCubes.splice(0,$scope.allCubes.length);
+    }
+
+    for (var i = 0; i < all_cubes.length; i++) {
+      $scope.allCubes.push(all_cubes[i].name.toUpperCase());
+    }
+  });
+
     $scope.$watch('cubeMetaFrame', function (newValue, oldValue) {
         if(!newValue){
             return;
@@ -191,7 +204,11 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
     };
 
   $scope.check_cube_info = function(){
-
+    if(($scope.state.mode === "edit") &&($scope.allCubes.indexOf($scope.cubeMetaFrame.name.toUpperCase()) >= 0)){
+      SweetAlert.swal('Oops...', "The cube named [" + $scope.cubeMetaFrame.name.toUpperCase() + "] already exists", 'warning');
+      return false;
+    }
+    return true;
   }
 
     $scope.check_cube_dimension = function(){


[2/5] incubator-kylin git commit: KYLIN-1041, Streaming UI

Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/model/cubeListModel.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/cubeListModel.js b/webapp/app/js/model/cubeListModel.js
index 7145092..5a2d1b2 100755
--- a/webapp/app/js/model/cubeListModel.js
+++ b/webapp/app/js/model/cubeListModel.js
@@ -17,8 +17,8 @@
 */
 
 KylinApp.service('CubeList',function(CubeService,$q,AccessService){
-    var cubes=[];
     var _this = this;
+    this.cubes=[];
 
     this.list = function(queryParam){
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/model/streamingListModel.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/streamingListModel.js b/webapp/app/js/model/streamingListModel.js
new file mode 100644
index 0000000..113ffe6
--- /dev/null
+++ b/webapp/app/js/model/streamingListModel.js
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+KylinApp.service('StreamingList', function (CubeService, $q, AccessService, StreamingService) {
+  var _this = this;
+  this.streamingConfigs = [];
+  this.kafkaConfigs = [];
+
+
+  this.list = function () {
+    var defer = $q.defer();
+
+    var streamingPromises = [];
+    var kafkaPromises = [];
+
+
+    kafkaPromises.push(StreamingService.getKfkConfig({}, function (kfkConfigs) {
+      _this.kafkaConfigs = kfkConfigs;
+    },function(){
+      defer.reject("Failed to load models");
+    }).$promise);
+
+    streamingPromises.push(StreamingService.getConfig({}, function (streamings) {
+      _this.streamingConfigs = streamings;
+    },function(){
+      defer.reject("Failed to load models");
+    }).$promise);
+
+    $q.all(streamingPromises,kafkaPromises).then(function(result,rs){
+      defer.resolve("success");
+    },function(){
+      defer.resolve("failed");
+    })
+    return defer.promise;
+
+  };
+
+  this.checkCubeExist = function(cubeName){
+    var result = {streaming:null,exist:false};
+    for(var i=0;i<_this.streamingConfigs.length;i++){
+      if(_this.streamingConfigs[i].cubeName == cubeName){
+        result ={
+          streaming:_this.streamingConfigs[i],
+          exist:true
+        }
+        break;
+      }
+    }
+    return result;
+  }
+
+  this.getKafkaConfig = function(kfkName){
+      for(var i=0;i<_this.kafkaConfigs.length;i++) {
+        if(_this.kafkaConfigs[i].name == kfkName){
+          return _this.kafkaConfigs[i];
+        }
+      }
+    }
+
+  this.removeAll = function () {
+    _this.streamingConfigs = [];
+    _this.kafkaConfigs = [];
+  };
+
+});

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/model/streamingModel.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/streamingModel.js b/webapp/app/js/model/streamingModel.js
new file mode 100644
index 0000000..fa2d20c
--- /dev/null
+++ b/webapp/app/js/model/streamingModel.js
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+KylinApp.service('StreamingModel', function () {
+
+  //
+  this.createStreamingConfig = function () {
+    var streamingConfig = {
+      "name": "",
+      "iiName": "",
+      "cubeName": ""
+    };
+
+    return streamingConfig;
+  };
+
+  this.createKafkaConfig = function () {
+    var kafkaConfig = {
+      "name": "",
+      "topic": "",
+      "timeout": "60000",
+      "maxReadCount": "1000",
+      "bufferSize": "65536",
+      "parserName": "org.apache.kylin.streaming.TimedJsonStreamParser",
+      "margin": "300000",
+      "clusters":[],
+      "parserProperties":""
+      }
+
+    return kafkaConfig;
+  }
+
+  this.createKafkaCluster = function () {
+      var kafkaCluster = {
+          "brokers":[]
+      }
+
+      return kafkaCluster;
+  }
+
+  this.createBrokerConfig = function () {
+        var brokerConfig = {
+            "id":'',
+            "host":'',
+            "port":''
+        }
+
+      return brokerConfig;
+  }
+
+})

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/services/streaming.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/services/streaming.js b/webapp/app/js/services/streaming.js
new file mode 100644
index 0000000..9da4394
--- /dev/null
+++ b/webapp/app/js/services/streaming.js
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+KylinApp.factory('StreamingService', ['$resource', function ($resource, config) {
+    return $resource(Config.service.url + 'streaming/:streamingId/:propName/:propValue/:action', {}, {
+        list: {method: 'GET', params: {}, isArray: true},
+        'getConfig': {method: 'GET',params: {action:'getConfig'},isArray:true},
+        'getKfkConfig': {method: 'GET',params: {action:'getKfkConfig'},isArray:true},
+        drop: {method: 'DELETE', params: {}, isArray: false},
+        save: {method: 'POST', params: {}, isArray: false},
+        update: {method: 'PUT', params: {}, isArray: false}
+    });
+}]);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/cubeDesigner/filter.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/filter.html b/webapp/app/partials/cubeDesigner/filter.html
deleted file mode 100644
index 118c4aa..0000000
--- a/webapp/app/partials/cubeDesigner/filter.html
+++ /dev/null
@@ -1,66 +0,0 @@
-<!--
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
--->
-
-<div ng-controller="CubeFilterCtrl">
-<div class="row">
-    <div class="col-xs-8">
-        <!--Cube Designer-->
-        <div class="form-group" ng-if="state.mode=='edit'"
-             style="font-family:'Monaco', 'Menlo', 'Ubuntu Mono', 'Consolas', 'source-code-pro'">
-            <label for="filter_condition"
-                   style="color: #930f80;"><b>WHERE</b></label>
-            <textarea id="filter_condition" type="text"
-                    style="height:150px"
-                    class="form-control box-default"
-                    placeholder="Please input WHERE clause without typing 'WHERE'"
-                    ng-model="metaModel.model.filter_condition">
-            </textarea>
-        </div>
-        <!--Cube Detail-->
-        <div class="form-group row" ng-if="state.mode=='view'"
-             style="font-family:'Monaco', 'Menlo', 'Ubuntu Mono', 'Consolas', 'source-code-pro'">
-            <div ng-if="metaModel.model.filter_condition" class="col-md-11 col-md-offset-1">
-                <p style="color: #930f80;"><b>WHERE</b></p>
-                <span>{{metaModel.model.filter_condition}}</span>
-            </div>
-            <div ng-if="!metaModel.model.filter_condition" no-result text="No Filter."></div>
-        </div>
-    </div>
-
-    <!--Tips-->
-    <div class="col-xs-4">
-        <div class="box box-solid">
-            <div class="box-header">
-                <h4 class="box-title">Tips</h4>
-            </div>
-            <div class="box-body">
-                <div class="row">
-                    <div class="col-xs-12">
-                        <ol class="text-info">
-                            <li>Where clause to filter data from source</li>
-                            <li>Do not include date column which will be used for incremental refresh</li>
-                            <li>Do not include "Where"</li>
-                            <li>Please verify SQL when finish cube design from SQL view of cube</li>
-                        </ol>
-                    </div>
-                </div>
-            </div>
-        </div>
-    </div>
-</div>
-</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/cubeDesigner/info.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/info.html b/webapp/app/partials/cubeDesigner/info.html
index f680487..3389c4a 100644
--- a/webapp/app/partials/cubeDesigner/info.html
+++ b/webapp/app/partials/cubeDesigner/info.html
@@ -20,7 +20,7 @@
     <div class="col-xs-8">
         <ng-form name="forms.cube_info_form" novalidate>
             <!--Project-->
-            <div class="form-group required">
+            <div class="form-group" ng-class="{'required':state.mode=='edit'}">
                 <div class="row">
                   <label class="col-xs-12 col-sm-3 control-label no-padding-right">
                     <b>Model Name</b>
@@ -40,7 +40,7 @@
             </div>
 
             <!--Cube Name-->
-            <div class="form-group required">
+            <div class="form-group" ng-class="{'required':state.mode=='edit'}">
                 <div class="row">
                     <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default">
                         <b>Cube Name</b>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/cubeDesigner/kafkaAdvancedConfig.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/kafkaAdvancedConfig.html b/webapp/app/partials/cubeDesigner/kafkaAdvancedConfig.html
new file mode 100644
index 0000000..c0b2b39
--- /dev/null
+++ b/webapp/app/partials/cubeDesigner/kafkaAdvancedConfig.html
@@ -0,0 +1,165 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+
+<div ng-controller="kafkaConfigCtrl">
+  <ng-form name="forms.kafka_ad_config_form" novalidate>
+    <accordion>
+
+      <accordion-group is-open="state.isStreamingAdOpen" ng-init="state.isStreamingAdOpen=true">
+        <accordion-heading>
+          Advanced Setting
+          <i class="pull-right glyphicon"
+             ng-class="{'glyphicon-chevron-down': state.isStreamingAdOpen, 'glyphicon-chevron-right': !state.isStreamingAdOpen}"></i>
+        </accordion-heading>
+
+        <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+          <div class="row">
+            <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+              <b>Timeout</b>
+            </label>
+
+            <div class="col-xs-12 col-sm-6"
+                 ng-class="{'has-error':forms.kafka_ad_config_form.timeout.$invalid && (forms.kafka_ad_config_form.timeout.$dirty||forms.kafka_ad_config_form.timeout.$sbumitted)}">
+              <input ng-if="state.mode=='edit'" name="name" required ng-model="kafkaMeta.timeout" type="text"
+                     placeholder="Input kafkaConfig timeout"
+                     class="form-control"/>
+              <small class="help-block"
+                     ng-show="forms.kafka_ad_config_form.timeout.$error.required && (forms.kafka_ad_config_form.timeout.$dirty||forms.kafka_ad_config_form.$sbumitted)">
+                Kafka timeout is required.
+              </small>
+              <span ng-if="state.mode=='view'">{{kafkaMeta.timeout}}</span>
+            </div>
+          </div>
+        </div>
+
+        <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+          <div class="row">
+            <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+              <b>Max Read Count</b>
+            </label>
+
+            <div class="col-xs-12 col-sm-6"
+                 ng-class="{'has-error':forms.kafka_ad_config_form.maxReadCount.$invalid && (forms.kafka_ad_config_form.maxReadCount.$dirty||forms.kafka_ad_config_form.maxReadCount.$sbumitted)}">
+              <input ng-if="state.mode=='edit'" name="name" required ng-model="kafkaMeta.maxReadCount" type="text"
+                     placeholder="Input kafkaConfig maxReadCount"
+                     class="form-control"/>
+              <small class="help-block"
+                     ng-show="forms.kafka_ad_config_form.maxReadCount.$error.required && (forms.kafka_ad_config_form.maxReadCount.$dirty||forms.kafka_ad_config_form.$sbumitted)">
+                Kafka maxReadCount is required.
+              </small>
+              <span ng-if="state.mode=='view'">{{kafkaMeta.maxReadCount}}</span>
+            </div>
+          </div>
+        </div>
+
+        <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+          <div class="row">
+            <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+              <b>Buffer Size</b>
+            </label>
+
+            <div class="col-xs-12 col-sm-6"
+                 ng-class="{'has-error':forms.kafka_ad_config_form.bufferSize.$invalid && (forms.kafka_ad_config_form.bufferSize.$dirty||forms.kafka_ad_config_form.bufferSize.$sbumitted)}">
+              <input ng-if="state.mode=='edit'" name="name" required ng-model="kafkaMeta.bufferSize" type="text"
+                     placeholder="Input kafkaConfig bufferSize"
+                     class="form-control"/>
+              <small class="help-block"
+                     ng-show="forms.kafka_ad_config_form.bufferSize.$error.required && (forms.kafka_ad_config_form.bufferSize.$dirty||forms.kafka_ad_config_form.$sbumitted)">
+                Kafka bufferSize is required.
+              </small>
+              <span ng-if="state.mode=='view'">{{kafkaMeta.bufferSize}}</span>
+            </div>
+          </div>
+        </div>
+
+        <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+          <div class="row">
+            <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+              <b>Margin</b>
+            </label>
+
+            <div class="col-xs-12 col-sm-6"
+                 ng-class="{'has-error':forms.kafka_ad_config_form.margin.$invalid && (forms.kafka_ad_config_form.margin.$dirty||forms.kafka_ad_config_form.margin.$sbumitted)}">
+              <input ng-if="state.mode=='edit'" name="name" required ng-model="kafkaMeta.margin" type="text"
+                     placeholder="Input kafkaConfig margin"
+                     class="form-control"/>
+              <small class="help-block"
+                     ng-show="forms.kafka_ad_config_form.margin.$error.required && (forms.kafka_ad_config_form.margin.$dirty||forms.kafka_ad_config_form.$sbumitted)">
+                Kafka margin is required.
+              </small>
+              <span ng-if="state.mode=='view'">{{kafkaMeta.margin}}</span>
+            </div>
+          </div>
+        </div>
+      </accordion-group>
+    </accordion>
+
+    <hr/>
+    <accordion>
+
+      <accordion-group is-open="state.isParserHeaderOpen">
+        <accordion-heading>
+          Parser Setting
+          <i class="pull-right glyphicon"
+             ng-class="{'glyphicon-chevron-down': state.isParserHeaderOpen, 'glyphicon-chevron-right': !state.isParserHeaderOpen}"></i>
+        </accordion-heading>
+
+        <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+          <div class="row">
+            <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+              <b>Parser Name</b>
+            </label>
+
+            <div class="col-xs-12 col-sm-6"
+                 ng-class="{'has-error':forms.kafka_ad_config_form.parserName.$invalid && (forms.kafka_ad_config_form.parserName.$dirty||forms.kafka_ad_config_form.parserName.$sbumitted)}">
+              <input ng-if="state.mode=='edit'" name="name" required ng-model="kafkaMeta.parserName" type="text"
+                     placeholder="Input kafkaConfig parserName"
+                     class="form-control"/>
+              <small class="help-block"
+                     ng-show="forms.kafka_ad_config_form.parserName.$error.required && (forms.kafka_ad_config_form.parserName.$dirty||forms.kafka_ad_config_form.$sbumitted)">
+                Kafka parserName is required.
+              </small>
+              <span ng-if="state.mode=='view'">{{kafkaMeta.parserName}}</span>
+            </div>
+          </div>
+        </div>
+
+        <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+          <div class="row">
+            <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+              <b>Parser Properties</b>
+            </label>
+
+            <div class="col-xs-12 col-sm-6"
+                 ng-class="{'has-error':forms.kafka_ad_config_form.parserProperties.$invalid && (forms.kafka_ad_config_form.parserProperties.$dirty||forms.kafka_ad_config_form.parserProperties.$sbumitted)}">
+              <input ng-if="state.mode=='edit'" name="name" required ng-model="kafkaMeta.parserProperties" type="text"
+                     placeholder="configA=1;configB=2"
+                     class="form-control"/>
+              <small class="help-block"
+                     ng-show="forms.kafka_ad_config_form.parserProperties.$error.required && (forms.kafka_ad_config_form.parserProperties.$dirty||forms.kafka_ad_config_form.$sbumitted)">
+                Parser Properties is required.
+              </small>
+              <span ng-if="state.mode=='view'">{{kafkaMeta.parserProperties}}</span>
+            </div>
+          </div>
+        </div>
+      </accordion-group>
+    </accordion>
+
+  </ng-form>
+</div>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/cubeDesigner/kafkaBasicConfig.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/kafkaBasicConfig.html b/webapp/app/partials/cubeDesigner/kafkaBasicConfig.html
new file mode 100644
index 0000000..d9ca5b1
--- /dev/null
+++ b/webapp/app/partials/cubeDesigner/kafkaBasicConfig.html
@@ -0,0 +1,114 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+
+<div ng-controller="kafkaConfigCtrl">
+  <ng-form name="forms.kafka_config_form" novalidate>
+    <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+      <div class="row">
+        <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+          <b>Topic</b>
+        </label>
+        <div class="col-xs-12 col-sm-6"   ng-class="{'has-error':forms.kafka_config_form.topic.$invalid && (forms.kafka_config_form.topic.$dirty||forms.kafka_config_form.topic.$sbumitted)}">
+          <input  ng-if="state.mode=='edit'"  name="name" required ng-model="kafkaMeta.topic" type="text"
+                  placeholder="Input kafkaConfig topic"
+                  class="form-control"/>
+          <small class="help-block" ng-show="forms.kafka_config_form.topic.$error.required && (forms.kafka_config_form.topic.$dirty||forms.kafka_config_form.$sbumitted)">Kafka topic is required.</small>
+          <span ng-if="state.mode=='view'">{{kafkaMeta.topic}}</span>
+        </div>
+      </div>
+    </div>
+
+
+
+
+    <div ng-repeat="cluster in kafkaMeta.clusters | filter: state.measureFilter track by $index" class="box">
+      <div class="box-header">
+        <h3 class="box-title">Cluster-{{$index+1}}</h3>
+        <button type="button" ng-click="removeCluster(cluster)" class="close" data-dismiss="modal" aria-label="Close"><span aria-hidden="true">×</span></button>
+      </div>
+      <div class="box-body no-padding">
+        <table class="table table-condensed" ng-if="cluster.brokers.length||cluster.newBroker">
+          <thead>
+          <tr>
+            <th>ID</th>
+            <th>Host</th>
+            <th>Port</th>
+            <th ng-if="state.mode=='edit'">Actions</th>
+          </tr>
+          </thead>
+          <tbody>
+          <tr ng-repeat="broker in cluster.brokers| filter: state.measureFilter track by $index">
+            <td>{{broker.id}}</td>
+            <td>{{broker.host}}</td>
+            <td>{{broker.port}}</td>
+            <td ng-if="state.mode=='edit'">
+              <!--Edit Button -->
+              <button class="btn btn-xs btn-info" ng-click="addBroker(cluster,broker)">
+                <i class="fa fa-pencil"></i>
+              </button>
+
+              <button class="btn btn-xs btn-danger" ng-click="removeElement(cluster,broker)"><i class="fa fa-trash-o"></i>
+              </button>
+            </td>
+          </tr>
+          <tr ng-if="cluster.newBroker">
+            <td>
+              <div class="input-group">
+                <input class="form-control" type="text" ng-model="cluster.newBroker.id" name="broker_id" placeholder="Input broker ID"/>
+              </div>
+            </td>
+            <td>
+              <div class="input-group">
+                <input class="form-control" type="text" ng-model="cluster.newBroker.host" name="broker_host" placeholder="Input broker host">
+              </div>
+            </td>
+            <td>
+              <div class="input-group">
+                <input class="form-control" type="text" ng-model="cluster.newBroker.port" name="broker_port" placeholder="Input broker port">
+              </div>
+            </td>
+            <td>
+              <button class="btn btn-xs btn-danger" ng-click="removeNewBroker(cluster)"><i class="fa fa-trash-o"></i>
+              </button>
+            </td>
+          </tr>
+          </tbody>
+        </table>
+      </div>
+      <div class="box-footer">
+        <div>
+          <button class="btn btn-sm btn-success" ng-click="addBroker(cluster)" ng-show="state.mode=='edit'&&!cluster.newBroker">
+            <i class="fa fa-plus"></i> Broker
+          </button>
+          <button class="btn btn-sm btn-info" ng-click="saveNewBroker(cluster)" ng-show="state.mode=='edit'&&cluster.newBroker">
+            <i class="fa fa-saved"></i> Save
+          </button>
+          <button class="btn btn-link" ng-click="clearNewBroker(cluster)"  ng-show="state.mode=='edit'&&cluster.newBroker">Cancel</button>
+        </div>
+      </div>
+    </div>
+    <!--Add Measures Button-->
+    <div class="form-group">
+      <button class="btn btn-sm btn-info" ng-click="addCluster()" ng-show="state.mode=='edit'">
+        <i class="fa fa-plus"></i> Cluster
+      </button>
+    </div>
+
+
+  </ng-form>
+</div>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/cubeDesigner/measures.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/measures.html b/webapp/app/partials/cubeDesigner/measures.html
index e4962dc..ece2b7d 100755
--- a/webapp/app/partials/cubeDesigner/measures.html
+++ b/webapp/app/partials/cubeDesigner/measures.html
@@ -17,186 +17,188 @@
 -->
 
 <!-- Measures Summary -->
-<ng-form name="forms.cube_measure_form">
-    <div class="dataTables_wrapper form-inline no-footer" ng-if="cubeMetaFrame.measures.length > 0">
-            <table class="table table-striped table-hover">
-                <thead>
-                    <tr>
-                        <th>ID</th>
-                        <th>Name</th>
-                        <th>Expression</th>
-                        <th>Param Type</th>
-                        <th>Param Value</th>
-                        <th>Return Type</th>
-                        <th ng-if="state.mode=='edit'">Actions</th>
-                    </tr>
-                </thead>
-                <tbody>
-                    <tr ng-repeat="measure in cubeMetaFrame.measures | filter: state.measureFilter track by $index">
-                        <td>
-                            <!--ID -->
-                            <b>{{measure.id = ($index + 1)}}</b>
-                        </td>
-                        <td>
-                            <!--Name -->
-                            <span tooltip="measure name..">{{measure.name}}</span>
-                        </td>
-                        <td>
-                            <!--Expression -->
-                            <span>{{measure.function.expression}}</span>
-                        </td>
-                        <td>
-                            <!--Param Type -->
-                            <span>{{measure.function.parameter.type}}</span>
-                        </td>
-                        <td>
-                            <!--Param Value -->
-                            <span>{{measure.function.parameter.value}}</span>
-                        </td>
-                        <td>
-                            <!--Return Type -->
-                            <span>{{measure.function.returntype}}</span>
-                        </td>
-                        <td ng-if="state.mode=='edit'">
-                            <!--Edit Button -->
-                            <button class="btn btn-xs btn-info" ng-click="addNewMeasure(measure)">
-                                <i class="fa fa-pencil"></i>
-                            </button>
-                            <!--Remove Button -->
-                            <button class="btn btn-xs  btn-danger" ng-click="removeElement(cubeMetaFrame.measures, measure)">
-                                <i class="fa fa-trash-o"></i>
-                            </button>
-                        </td>
-                    </tr>
-                </tbody>
-            </table>
-    </div>
-</ng-form>
+<div ng-controller="CubeMeasuresCtrl">
+  <ng-form name="forms.cube_measure_form">
+      <div class="dataTables_wrapper form-inline no-footer" ng-if="cubeMetaFrame.measures.length > 0">
+              <table class="table table-striped table-hover">
+                  <thead>
+                      <tr>
+                          <th>ID</th>
+                          <th>Name</th>
+                          <th>Expression</th>
+                          <th>Param Type</th>
+                          <th>Param Value</th>
+                          <th>Return Type</th>
+                          <th ng-if="state.mode=='edit'">Actions</th>
+                      </tr>
+                  </thead>
+                  <tbody>
+                      <tr ng-repeat="measure in cubeMetaFrame.measures | filter: state.measureFilter track by $index">
+                          <td>
+                              <!--ID -->
+                              <b>{{measure.id = ($index + 1)}}</b>
+                          </td>
+                          <td>
+                              <!--Name -->
+                              <span tooltip="measure name..">{{measure.name}}</span>
+                          </td>
+                          <td>
+                              <!--Expression -->
+                              <span>{{measure.function.expression}}</span>
+                          </td>
+                          <td>
+                              <!--Param Type -->
+                              <span>{{measure.function.parameter.type}}</span>
+                          </td>
+                          <td>
+                              <!--Param Value -->
+                              <span>{{measure.function.parameter.value}}</span>
+                          </td>
+                          <td>
+                              <!--Return Type -->
+                              <span>{{measure.function.returntype}}</span>
+                          </td>
+                          <td ng-if="state.mode=='edit'">
+                              <!--Edit Button -->
+                              <button class="btn btn-xs btn-info" ng-click="addNewMeasure(measure)">
+                                  <i class="fa fa-pencil"></i>
+                              </button>
+                              <!--Remove Button -->
+                              <button class="btn btn-xs  btn-danger" ng-click="removeElement(cubeMetaFrame.measures, measure)">
+                                  <i class="fa fa-trash-o"></i>
+                              </button>
+                          </td>
+                      </tr>
+                  </tbody>
+              </table>
+      </div>
+  </ng-form>
 
-<!--Add Measures Button-->
-<div class="form-group">
-    <button class="btn btn-sm btn-info" ng-click="addNewMeasure()" ng-show="state.mode=='edit' && !newMeasure">
-        <i class="fa fa-plus"></i> Measure
-    </button>
-</div>
+  <!--Add Measures Button-->
+  <div class="form-group">
+      <button class="btn btn-sm btn-info" ng-click="addNewMeasure()" ng-show="state.mode=='edit' && !newMeasure">
+          <i class="fa fa-plus"></i> Measure
+      </button>
+  </div>
 
-<!--Edit Measure-->
-<ng-form name="edit_mes_form">
-<div class="box box-solid" ng-if="newMeasure">
-    <div class="box-header">
-        <h4 class="box-title text-info">Edit Measure</h4>
-    </div>
-    <div class="box-body">
-        <div class="row">
-            <div class="col-xs-8">
-                    <!--Name-->
-                    <div class="form-group">
-                        <div class="row">
-                            <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Name</b></label>
-                            <div class="col-xs-12 col-sm-6">
-                                <input type="text" placeholder="Name.." class="form-control"
-                                    tooltip="measure name.." tooltip-trigger="focus"
-                                    ng-model="newMeasure.name" required />
-                            </div>
-                        </div>
-                    </div>
-                    <!--Expression-->
-                    <div class="form-group">
-                        <div class="row">
-                            <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Expression</b></label>
-                            <div class="col-xs-12 col-sm-6">
-                                <select class="form-control"
-                                    ng-init="newMeasure.function.expression = (!!newMeasure.function.expression)?newMeasure.function.expression:cubeConfig.dftSelections.measureExpression" chosen ng-model="newMeasure.function.expression" required
-                                    ng-change="measureReturnTypeUpdate();"
-                                    ng-options="me as me for me in cubeConfig.measureExpressions">
-                                    <option value=""></option>
-                                </select>
-                            </div>
-                        </div>
-                    </div>
-                    <!--Param Type-->
-                    <div class="form-group">
-                        <div class="row">
-                            <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Param Type</b></label>
-                            <div class="col-xs-12 col-sm-6">
-                                <select class="form-control" ng-if="newMeasure.function.expression != 'COUNT'"
-                                    ng-init="newMeasure.function.parameter.type=(!!newMeasure.function.parameter.type)?newMeasure.function.parameter.type: 'column' "
-                                    chosen ng-model="newMeasure.function.parameter.type" required
-                                    ng-change="measureReturnTypeUpdate();"
-                                    ng-options="mpt as mpt for mpt in cubeConfig.measureParamType">
-                                    <option value=""></option>
-                                </select>
-                                <span class="font-color-default"
-                                       ng-if="newMeasure.function.expression == 'COUNT'"
-                                       ng-init="newMeasure.function.parameter.type= 'constant' "><b>&nbsp;&nbsp;constant</b>
-                                </span>
-                            </div>
-                        </div>
-                    </div>
-                    <!--Param Value-->
-                    <div class="form-group">
-                        <div class="row">
-                            <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Param Value</b></label>
-                            <div class="col-xs-12 col-sm-6">
-                                <span class="font-color-default"
-                                    ng-if="newMeasure.function.parameter.type == 'constant'"
-                                    ng-init="newMeasure.function.parameter.value = 1"><b>&nbsp;&nbsp;1</b></span>
-                                <select class="form-control" chosen
-                                    ng-if="newMeasure.function.parameter.type == 'column'"
-                                    ng-model="newMeasure.function.parameter.value"
-                                    ng-change="measureReturnTypeUpdate();"
-                                    ng-options="columns.name as columns.name for columns in getMetricColumnsByTable(metaModel.model.fact_table)" >
-                                    <option value="">-- Select a Fact Table Column --</option>
-                                </select>
-                            </div>
-                        </div>
-                    </div>
-                    <!--Return Type-->
-                    <div class="form-group">
-                        <div class="row">
-                            <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Return Type</b></label>
-                            <div class="col-xs-12 col-sm-6">
-                                <select class="form-control"
-                                    ng-if="newMeasure.function.expression == 'COUNT_DISTINCT'"
-                                    ng-init="newMeasure.function.returntype = (!!newMeasure.function.returntype)?newMeasure.function.returntype:cubeConfig.dftSelections.distinctDataType.value"
-                                    chosen ng-model="newMeasure.function.returntype" required
-                                    ng-options="ddt.value as ddt.name for ddt in cubeConfig.distinctDataTypes">
-                                    <option value=""></option>
-                                </select>
-                                <span class="font-color-default"
-                                      ng-if="newMeasure.function.expression != 'COUNT_DISTINCT'"
-                                     ><b>&nbsp;&nbsp;{{newMeasure.function.returntype | uppercase}}</b>
-                                </span>
-                            </div>
-                        </div>
-                    </div>
-            </div>
+  <!--Edit Measure-->
+  <ng-form name="edit_mes_form">
+  <div class="box box-solid" ng-if="newMeasure">
+      <div class="box-header">
+          <h4 class="box-title text-info">Edit Measure</h4>
+      </div>
+      <div class="box-body">
+          <div class="row">
+              <div class="col-xs-8">
+                      <!--Name-->
+                      <div class="form-group">
+                          <div class="row">
+                              <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Name</b></label>
+                              <div class="col-xs-12 col-sm-6">
+                                  <input type="text" placeholder="Name.." class="form-control"
+                                      tooltip="measure name.." tooltip-trigger="focus"
+                                      ng-model="newMeasure.name" required />
+                              </div>
+                          </div>
+                      </div>
+                      <!--Expression-->
+                      <div class="form-group">
+                          <div class="row">
+                              <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Expression</b></label>
+                              <div class="col-xs-12 col-sm-6">
+                                  <select class="form-control"
+                                      ng-init="newMeasure.function.expression = (!!newMeasure.function.expression)?newMeasure.function.expression:cubeConfig.dftSelections.measureExpression" chosen ng-model="newMeasure.function.expression" required
+                                      ng-change="measureReturnTypeUpdate();"
+                                      ng-options="me as me for me in cubeConfig.measureExpressions">
+                                      <option value=""></option>
+                                  </select>
+                              </div>
+                          </div>
+                      </div>
+                      <!--Param Type-->
+                      <div class="form-group">
+                          <div class="row">
+                              <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Param Type</b></label>
+                              <div class="col-xs-12 col-sm-6">
+                                  <select class="form-control" ng-if="newMeasure.function.expression != 'COUNT'"
+                                      ng-init="newMeasure.function.parameter.type=(!!newMeasure.function.parameter.type)?newMeasure.function.parameter.type: 'column' "
+                                      chosen ng-model="newMeasure.function.parameter.type" required
+                                      ng-change="measureReturnTypeUpdate();"
+                                      ng-options="mpt as mpt for mpt in cubeConfig.measureParamType">
+                                      <option value=""></option>
+                                  </select>
+                                  <span class="font-color-default"
+                                         ng-if="newMeasure.function.expression == 'COUNT'"
+                                         ng-init="newMeasure.function.parameter.type= 'constant' "><b>&nbsp;&nbsp;constant</b>
+                                  </span>
+                              </div>
+                          </div>
+                      </div>
+                      <!--Param Value-->
+                      <div class="form-group">
+                          <div class="row">
+                              <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Param Value</b></label>
+                              <div class="col-xs-12 col-sm-6">
+                                  <span class="font-color-default"
+                                      ng-if="newMeasure.function.parameter.type == 'constant'"
+                                      ng-init="newMeasure.function.parameter.value = 1"><b>&nbsp;&nbsp;1</b></span>
+                                  <select class="form-control" chosen
+                                      ng-if="newMeasure.function.parameter.type == 'column'"
+                                      ng-model="newMeasure.function.parameter.value"
+                                      ng-change="measureReturnTypeUpdate();"
+                                      ng-options="columns.name as columns.name for columns in getMetricColumnsByTable(metaModel.model.fact_table)" >
+                                      <option value="">-- Select a Fact Table Column --</option>
+                                  </select>
+                              </div>
+                          </div>
+                      </div>
+                      <!--Return Type-->
+                      <div class="form-group">
+                          <div class="row">
+                              <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default"><b>Return Type</b></label>
+                              <div class="col-xs-12 col-sm-6">
+                                  <select class="form-control"
+                                      ng-if="newMeasure.function.expression == 'COUNT_DISTINCT'"
+                                      ng-init="newMeasure.function.returntype = (!!newMeasure.function.returntype)?newMeasure.function.returntype:cubeConfig.dftSelections.distinctDataType.value"
+                                      chosen ng-model="newMeasure.function.returntype" required
+                                      ng-options="ddt.value as ddt.name for ddt in cubeConfig.distinctDataTypes">
+                                      <option value=""></option>
+                                  </select>
+                                  <span class="font-color-default"
+                                        ng-if="newMeasure.function.expression != 'COUNT_DISTINCT'"
+                                       ><b>&nbsp;&nbsp;{{newMeasure.function.returntype | uppercase}}</b>
+                                  </span>
+                              </div>
+                          </div>
+                      </div>
+              </div>
 
-            <!--Tips-->
-            <div class="col-xs-4">
-                <div class="box box-solid">
-                    <div class="box-header">
-                        <h4 class="box-title">Tips</h4>
-                    </div>
-                    <div class="box-body">
-                        <div class="row">
-                            <div class="col-xs-12">
-                                <ol class="text-info">
-                                    <li>All cubes have to contain one measure for Count(1), suggest use "_Count_" as name (Has been generated automatically)</li>
-                                    <li>Only accept single column in param value with "Column" type</li>
-                                    <li>Distinct Count is approximate, please indicate Error Rate, higher accuracy degree accompanied with larger storage size and longer build time</li>
-                                </ol>
-                            </div>
-                        </div>
-                    </div>
-                </div>
-            </div>
-        </div>
-    </div>
-    <div class="box-footer">
-        <button class="btn btn-sm btn-info" ng-disabled="edit_mes_form.$invalid"
-                ng-click="saveNewMeasure()" ng-show="state.mode=='edit'">OK</button>
-        <button class="btn btn-link" ng-click="clearNewMeasure()">Cancel</button>
-    </div>
+              <!--Tips-->
+              <div class="col-xs-4">
+                  <div class="box box-solid">
+                      <div class="box-header">
+                          <h4 class="box-title">Tips</h4>
+                      </div>
+                      <div class="box-body">
+                          <div class="row">
+                              <div class="col-xs-12">
+                                  <ol class="text-info">
+                                      <li>All cubes have to contain one measure for Count(1), suggest use "_Count_" as name (Has been generated automatically)</li>
+                                      <li>Only accept single column in param value with "Column" type</li>
+                                      <li>Distinct Count is approximate, please indicate Error Rate, higher accuracy degree accompanied with larger storage size and longer build time</li>
+                                  </ol>
+                              </div>
+                          </div>
+                      </div>
+                  </div>
+              </div>
+          </div>
+      </div>
+      <div class="box-footer">
+          <button class="btn btn-sm btn-info" ng-disabled="edit_mes_form.$invalid"
+                  ng-click="saveNewMeasure()" ng-show="state.mode=='edit'">OK</button>
+          <button class="btn btn-link" ng-click="clearNewMeasure()">Cancel</button>
+      </div>
+  </div>
+  </ng-form>
 </div>
-</ng-form>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/cubeDesigner/streamingConfig.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/streamingConfig.html b/webapp/app/partials/cubeDesigner/streamingConfig.html
new file mode 100644
index 0000000..f8ab61d
--- /dev/null
+++ b/webapp/app/partials/cubeDesigner/streamingConfig.html
@@ -0,0 +1,282 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+
+<div ng-controller="streamingConfigCtrl">
+  <ng-form name="forms.cube_streaming_form" novalidate>
+    <div class="form-group">
+      <div class="row">
+        <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default">
+          <b>Is this cube for streaming use?</b>
+        </label>
+        <div class="col-xs-12 col-sm-6" ng-if="state.mode=='edit'" >
+          <toggle-switch ng-model="cubeState.isStreaming" on-label="YES" off-label="NO"> <toggle-switch>
+        </div>
+        <div class="col-xs-12 col-sm-6" ng-if="state.mode=='view'" >
+            <span>{{(!!cubeState.isStreaming)?'YES':'NO'}}</span>
+        </div>
+      </div>
+    </div>
+{{}}
+
+    <div ng-if="cubeState.isStreaming">
+      <accordion>
+        <accordion-group is-open="state.isKfkSettingOpen" ng-init="state.isKfkSettingOpen=true">
+          <accordion-heading>
+            Kafka Setting
+            <i class="pull-right glyphicon"
+               ng-class="{'glyphicon-chevron-down': state.isKfkSettingOpen, 'glyphicon-chevron-right': !state.isKfkSettingOpen}"></i>
+          </accordion-heading>
+
+          <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+        <div class="row">
+          <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+            <b>Topic</b>
+          </label>
+          <div class="col-xs-12 col-sm-6"   ng-class="{'has-error':forms.cube_streaming_form.topic.$invalid && (forms.cube_streaming_form.topic.$dirty||forms.cube_streaming_form.topic.$sbumitted)}">
+            <input  ng-if="state.mode=='edit'"  name="topic" required ng-model="kafkaMeta.topic" type="text"
+                    placeholder="Input kafkaConfig topic"
+                    class="form-control"/>
+            <small class="help-block" ng-show="forms.cube_streaming_form.topic.$error.required && (forms.cube_streaming_form.topic.$dirty||forms.cube_streaming_form.$sbumitted)">Kafka topic is required.</small>
+            <span ng-if="state.mode=='view'">{{kafkaMeta.topic}}</span>
+          </div>
+        </div>
+      </div>
+
+      <div ng-repeat="cluster in kafkaMeta.clusters | filter: state.measureFilter track by $index" class="box">
+        <div class="box-header">
+          <h3 class="box-title">Cluster-{{$index+1}}</h3>
+          <button ng-if="state.mode=='edit'" type="button" ng-click="removeCluster(cluster)" class="close" data-dismiss="modal" aria-label="Close"><span aria-hidden="true">×</span></button>
+        </div>
+        <div class="box-body no-padding">
+          <table class="table table-condensed" ng-if="cluster.brokers.length||cluster.newBroker">
+            <thead>
+            <tr>
+              <th>ID</th>
+              <th>Host</th>
+              <th>Port</th>
+              <th ng-if="state.mode=='edit'">Actions</th>
+            </tr>
+            </thead>
+            <tbody>
+            <tr ng-repeat="broker in cluster.brokers| filter: state.measureFilter track by $index">
+              <td>{{broker.id}}</td>
+              <td>{{broker.host}}</td>
+              <td>{{broker.port}}</td>
+              <td ng-if="state.mode=='edit'">
+                <!--Edit Button -->
+                <button class="btn btn-xs btn-info" ng-click="addBroker(cluster,broker)">
+                  <i class="fa fa-pencil"></i>
+                </button>
+
+                <button class="btn btn-xs btn-danger" ng-click="removeElement(cluster,broker)"><i class="fa fa-trash-o"></i>
+                </button>
+              </td>
+            </tr>
+            <tr ng-if="cluster.newBroker">
+              <td>
+                <div class="input-group">
+                  <input class="form-control" type="text" ng-model="cluster.newBroker.id" name="broker_id" placeholder="Input broker ID"/>
+                </div>
+              </td>
+              <td>
+                <div class="input-group">
+                  <input class="form-control" type="text" ng-model="cluster.newBroker.host" name="broker_host" placeholder="Input broker host">
+                </div>
+              </td>
+              <td>
+                <div class="input-group">
+                  <input class="form-control" type="text" ng-model="cluster.newBroker.port" name="broker_port" placeholder="Input broker port">
+                </div>
+              </td>
+              <td>
+                <button class="btn btn-xs btn-danger" ng-click="removeNewBroker(cluster)"><i class="fa fa-trash-o"></i>
+                </button>
+              </td>
+            </tr>
+            </tbody>
+          </table>
+        </div>
+        <div class="box-footer">
+          <div>
+            <button class="btn btn-sm btn-success" ng-click="addBroker(cluster)" ng-show="state.mode=='edit'&&!cluster.newBroker">
+              <i class="fa fa-plus"></i> Broker
+            </button>
+            <button class="btn btn-sm btn-info" ng-click="saveNewBroker(cluster)" ng-show="state.mode=='edit'&&cluster.newBroker">
+              <i class="fa fa-saved"></i> Save
+            </button>
+            <button class="btn btn-link" ng-click="clearNewBroker(cluster)"  ng-show="state.mode=='edit'&&cluster.newBroker">Cancel</button>
+          </div>
+        </div>
+      </div>
+      <!--Add Measures Button-->
+      <div class="form-group">
+        <button class="btn btn-sm btn-info" ng-click="addCluster()" ng-show="state.mode=='edit'">
+          <i class="fa fa-plus"></i> Cluster
+        </button>
+      </div>
+
+          </accordion-group>
+        </accordion>
+
+      <hr/>
+      <!--Advanced setting-->
+      <accordion>
+        <accordion-group is-open="state.isStreamingAdOpen" ng-init="state.isStreamingAdOpen=true">
+          <accordion-heading>
+            Advanced Setting
+            <i class="pull-right glyphicon"
+               ng-class="{'glyphicon-chevron-down': state.isStreamingAdOpen, 'glyphicon-chevron-right': !state.isStreamingAdOpen}"></i>
+          </accordion-heading>
+
+          <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+            <div class="row">
+              <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+                <b>Timeout</b>
+              </label>
+
+              <div class="col-xs-12 col-sm-6"
+                   ng-class="{'has-error':forms.cube_streaming_form.timeout.$invalid && (forms.cube_streaming_form.timeout.$dirty||forms.cube_streaming_form.timeout.$sbumitted)}">
+                <input ng-if="state.mode=='edit'" name="name" required ng-model="kafkaMeta.timeout" type="text"
+                       placeholder="Input kafkaConfig timeout"
+                       class="form-control"/>
+                <small class="help-block"
+                       ng-show="forms.cube_streaming_form.timeout.$error.required && (forms.cube_streaming_form.timeout.$dirty||forms.cube_streaming_form.$sbumitted)">
+                  Kafka timeout is required.
+                </small>
+                <span ng-if="state.mode=='view'">{{kafkaMeta.timeout}}</span>
+              </div>
+            </div>
+          </div>
+
+          <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+            <div class="row">
+              <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+                <b>Max Read Count</b>
+              </label>
+
+              <div class="col-xs-12 col-sm-6"
+                   ng-class="{'has-error':forms.cube_streaming_form.maxReadCount.$invalid && (forms.cube_streaming_form.maxReadCount.$dirty||forms.cube_streaming_form.maxReadCount.$sbumitted)}">
+                <input ng-if="state.mode=='edit'" name="name" required ng-model="kafkaMeta.maxReadCount" type="text"
+                       placeholder="Input kafkaConfig maxReadCount"
+                       class="form-control"/>
+                <small class="help-block"
+                       ng-show="forms.cube_streaming_form.maxReadCount.$error.required && (forms.cube_streaming_form.maxReadCount.$dirty||forms.cube_streaming_form.$sbumitted)">
+                  Kafka maxReadCount is required.
+                </small>
+                <span ng-if="state.mode=='view'">{{kafkaMeta.maxReadCount}}</span>
+              </div>
+            </div>
+          </div>
+
+          <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+            <div class="row">
+              <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+                <b>Buffer Size</b>
+              </label>
+
+              <div class="col-xs-12 col-sm-6"
+                   ng-class="{'has-error':forms.cube_streaming_form.bufferSize.$invalid && (forms.cube_streaming_form.bufferSize.$dirty||forms.cube_streaming_form.bufferSize.$sbumitted)}">
+                <input ng-if="state.mode=='edit'" name="name" required ng-model="kafkaMeta.bufferSize" type="text"
+                       placeholder="Input kafkaConfig bufferSize"
+                       class="form-control"/>
+                <small class="help-block"
+                       ng-show="forms.cube_streaming_form.bufferSize.$error.required && (forms.cube_streaming_form.bufferSize.$dirty||forms.cube_streaming_form.$sbumitted)">
+                  Kafka bufferSize is required.
+                </small>
+                <span ng-if="state.mode=='view'">{{kafkaMeta.bufferSize}}</span>
+              </div>
+            </div>
+          </div>
+
+          <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+            <div class="row">
+              <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+                <b>Margin</b>
+              </label>
+
+              <div class="col-xs-12 col-sm-6"
+                   ng-class="{'has-error':forms.cube_streaming_form.margin.$invalid && (forms.cube_streaming_form.margin.$dirty||forms.cube_streaming_form.margin.$sbumitted)}">
+                <input ng-if="state.mode=='edit'" name="name" required ng-model="kafkaMeta.margin" type="text"
+                       placeholder="Input kafkaConfig margin"
+                       class="form-control"/>
+                <small class="help-block"
+                       ng-show="forms.cube_streaming_form.margin.$error.required && (forms.cube_streaming_form.margin.$dirty||forms.cube_streaming_form.$sbumitted)">
+                  Kafka margin is required.
+                </small>
+                <span ng-if="state.mode=='view'">{{kafkaMeta.margin}}</span>
+              </div>
+            </div>
+          </div>
+        </accordion-group>
+      </accordion>
+
+      <hr/>
+      <accordion>
+
+        <accordion-group is-open="state.isParserHeaderOpen">
+          <accordion-heading>
+            Parser Setting
+            <i class="pull-right glyphicon"
+               ng-class="{'glyphicon-chevron-down': state.isParserHeaderOpen, 'glyphicon-chevron-right': !state.isParserHeaderOpen}"></i>
+          </accordion-heading>
+
+          <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+            <div class="row">
+              <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+                <b>Parser Name</b>
+              </label>
+
+              <div class="col-xs-12 col-sm-6"
+                   ng-class="{'has-error':forms.cube_streaming_form.parserName.$invalid && (forms.cube_streaming_form.parserName.$dirty||forms.cube_streaming_form.parserName.$sbumitted)}">
+                <input ng-if="state.mode=='edit'" name="name" required ng-model="kafkaMeta.parserName" type="text"
+                       placeholder="Input kafkaConfig parserName"
+                       class="form-control"/>
+                <small class="help-block"
+                       ng-show="forms.cube_streaming_form.parserName.$error.required && (forms.cube_streaming_form.parserName.$dirty||forms.cube_streaming_form.$sbumitted)">
+                  Kafka parserName is required.
+                </small>
+                <span ng-if="state.mode=='view'">{{kafkaMeta.parserName}}</span>
+              </div>
+            </div>
+          </div>
+
+          <div class="form-group" ng-class="{'required':state.mode=='edit'}">
+            <div class="row">
+              <label class="col-xs-12 col-sm-3 control-label no-padding-right">
+                <b>Parser Properties</b>
+              </label>
+
+              <div class="col-xs-12 col-sm-6"
+                   ng-class="{'has-error':forms.cube_streaming_form.parserProperties.$invalid && (forms.cube_streaming_form.parserProperties.$dirty||forms.cube_streaming_form.parserProperties.$sbumitted)}">
+                <input ng-if="state.mode=='edit'" name="name" required ng-model="kafkaMeta.parserProperties" type="text"
+                       placeholder="configA=1;configB=2"
+                       class="form-control"/>
+                <small class="help-block"
+                       ng-show="forms.cube_streaming_form.parserProperties.$error.required && (forms.cube_streaming_form.parserProperties.$dirty||forms.cube_streaming_form.$sbumitted)">
+                  Parser Properties is required.
+                </small>
+                <span ng-if="state.mode=='view'">{{kafkaMeta.parserProperties}}</span>
+              </div>
+            </div>
+          </div>
+        </accordion-group>
+      </accordion>
+
+    </div>
+  </ng-form>
+</div>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/cubes/cube_detail.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubes/cube_detail.html b/webapp/app/partials/cubes/cube_detail.html
index 48a22d7..49e2578 100755
--- a/webapp/app/partials/cubes/cube_detail.html
+++ b/webapp/app/partials/cubes/cube_detail.html
@@ -46,19 +46,17 @@
             ng-if="userService.hasRole('ROLE_ADMIN')">
             <a href="" ng-click="cube.visiblePage='hbase';getHbaseInfo(cube)">HBase</a>
         </li>
-
     </ul>
 
     <div class="cube-detail" ng-if="!cube.visiblePage || cube.visiblePage=='metadata'">
         <div ng-include="'partials/cubes/cube_schema.html'" ng-controller="CubeSchemaCtrl"
-             ng-init="state={mode:'view', cubeName:cube.name}"></div>
+             ng-init="state={mode:'view', cubeName:cube.name};cubeState={isStreaming:(!!cube.streaming)?true:false}"></div>
     </div>
 
     <div ng-show="cube.visiblePage=='sql'" class="cube-detail">
         <div ng-if="cube.sql">
             <pre style="background-color: white;border: 0px">{{cube.sql}}</pre>
         </div>
-
         <div ng-if="!cube.sql">
             <span calss="text-info">No SQL GENERATED.</span>
         </div>
@@ -101,7 +99,7 @@
         </div>
     </div>
 
-    <div class="cube-detail" ng-show="cube.visiblePage=='hbase'">
+  <div class="cube-detail" ng-show="cube.visiblePage=='hbase'">
         <div style="margin: 15px;">
             <div ng-repeat="table in cube.hbase">
                 <h5><b>HTable:</b> {{table.tableName}}</h5>
@@ -115,10 +113,12 @@
             <div ng-if="cube.hbase">
                 <div class="hr hr8 hr-double hr-dotted"></div>
                 <h5><b>Total Size:</b> <span class="red">{{cube.totalSize | bytes:2}}</span></h5>
+                <h5><b>Total Number:</b> <span class="red">{{cube.hbase.length}}</span></h5>
             </div>
             <div ng-if="cube.hbase.length == 0">
                 <h5>No HBase Info.</h5>
             </div>
         </div>
     </div>
-</div>
+  </div>
+

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/cubes/cubes.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubes/cubes.html b/webapp/app/partials/cubes/cubes.html
index 3c8c2e7..fdf3277 100644
--- a/webapp/app/partials/cubes/cubes.html
+++ b/webapp/app/partials/cubes/cubes.html
@@ -45,6 +45,7 @@
             </th>
             <th>Actions</th>
             <th ng-if="userService.hasRole('ROLE_ADMIN')">Admins</th>
+            <th></th>
         </tr>
         </thead>
         <!--Body-->
@@ -83,6 +84,7 @@
                             <a ng-click="dropCube(cube)" tooltip="Drop the cube, related jobs and data permanently.">Drop</a></li>
                         <li ng-if="cube.status=='DISABLED' && (userService.hasRole('ROLE_ADMIN') || hasPermission(cube, permissions.ADMINISTRATION.mask, permissions.MANAGEMENT.mask))">
                             <a ng-click="cubeEdit(cube);">Edit</a></li>
+                      <li ng-if="cube.streaming && cube.status=='DISABLED' && (userService.hasRole('ROLE_ADMIN') || hasPermission(cube, permissions.ADMINISTRATION.mask, permissions.MANAGEMENT.mask))">
                         <li><a ng-click="startJobSubmit(cube);">Build</a></li>
                         <li><a ng-click="startRefresh(cube)">Refresh</a></li>
                         <li><a ng-click="startMerge(cube)">Merge</a></li>
@@ -106,6 +108,11 @@
                     </ul>
                 </div>
             </td>
+            <td ng-if="cube.streaming">
+              <label class="badge label-info" style="cursor:pointer;">STREAMING</label>
+            </td>
+          <td ng-if="!cube.streaming">
+          </td>
         </tr>
         <tr ng-show="cube.showDetail">
             <td colspan="9" style="padding: 10px 30px 10px 30px;">

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/modelDesigner/conditions_settings.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/modelDesigner/conditions_settings.html b/webapp/app/partials/modelDesigner/conditions_settings.html
index 7a0d463..6e836ee 100644
--- a/webapp/app/partials/modelDesigner/conditions_settings.html
+++ b/webapp/app/partials/modelDesigner/conditions_settings.html
@@ -132,7 +132,11 @@
                     <div class="col-xs-12">
                         <ol class="text-info">
                             <li>Partition date column not required,leave as default if cube always need full build</li>
-                            <li>Partition date column will select 'date' or 'string' type column from fact table</li>
+                            <li>Partition date column will select 'date' type column from fact table</li>
+                            <li>Where clause to filter data from source</li>
+                            <li>Do not include date column which will be used for incremental refresh</li>
+                            <li>Do not include "Where"</li>
+                            <li>Please verify SQL when finish cube design from SQL view of cube</li>
                         </ol>
                     </div>
                 </div>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/modelDesigner/data_model.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/modelDesigner/data_model.html b/webapp/app/partials/modelDesigner/data_model.html
index e100287..e1cae39 100644
--- a/webapp/app/partials/modelDesigner/data_model.html
+++ b/webapp/app/partials/modelDesigner/data_model.html
@@ -20,7 +20,7 @@
     <ng-form name="forms.data_model_form">
 
     <!-- Fact Table Name -->
-    <div class="form-group required">
+    <div class="form-group" ng-class="{'required':state.mode=='edit'}">
         <div class="row">
             <label class="col-xs-12 col-sm-2 control-label concube.detailtrol-label no-padding-right font-color-default">
                 <b>Fact Table</b>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/modelDesigner/model_info.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/modelDesigner/model_info.html b/webapp/app/partials/modelDesigner/model_info.html
index aed4325..a290bfb 100644
--- a/webapp/app/partials/modelDesigner/model_info.html
+++ b/webapp/app/partials/modelDesigner/model_info.html
@@ -22,7 +22,7 @@
         <ng-form name="forms.model_info_form" novalidate>
 
             <!--Model Name-->
-            <div class="form-group required">
+            <div class="form-group" ng-class="{'required':state.mode=='edit'}">
                 <div class="row">
                     <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default">
                         <b>Model Name</b>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/models/models_tree.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/models/models_tree.html b/webapp/app/partials/models/models_tree.html
index 722d65f..9d86c50 100644
--- a/webapp/app/partials/models/models_tree.html
+++ b/webapp/app/partials/models/models_tree.html
@@ -44,10 +44,6 @@
             <a href="models/add"  ng-if="userService.hasRole('ROLE_MODELER')"><i class="fa fa-star"></i> New Model</a>
           </li>
 
-          <li ng-if="userService.hasRole('ROLE_ADMIN')">
-            <a href="streaming/add"  ng-if="userService.hasRole('ROLE_MODELER')"><i class="fa fa-area-chart"></i>New Streaming</a>
-          </li>
-
         </ul>
       </div>
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/streaming/streaming_edit.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/streaming/streaming_edit.html b/webapp/app/partials/streaming/streaming_edit.html
new file mode 100644
index 0000000..d4b4926
--- /dev/null
+++ b/webapp/app/partials/streaming/streaming_edit.html
@@ -0,0 +1,34 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+
+<div class="row">
+    <div class="col-xs-3">
+      <div ng-include src="'partials/models/models_tree.html'" ng-controller="ModelsCtrl"></div>
+      <!--<div ng-include src="'partials/tables/source_table_tree.html'" ng-controller="SourceMetaCtrl"></div>-->
+    </div>
+    <div class="col-xs-9">
+        <form role="form" name="cube_form" novalidate>
+            <!-- This margin in order to align with table tree in left part -->
+            <div style="margin-top: 20px;" ng-controller="CubeEditCtrl">
+                <div ng-include="'partials/streaming/streaming_schema.html'" ng-controller="StreamingSchemaCtrl">
+                </div>
+            </div>
+        </form>
+    </div>
+</div>
+</div>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/streaming/streaming_schema.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/streaming/streaming_schema.html b/webapp/app/partials/streaming/streaming_schema.html
new file mode 100644
index 0000000..a45904c
--- /dev/null
+++ b/webapp/app/partials/streaming/streaming_schema.html
@@ -0,0 +1,63 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+
+<!--hide when view model and no model selected-->
+<div class="box box-primary model-design box-2px" ng-init="model">
+    <div class="box-header widget-header-blue widget-header-flat">
+        <h4 class="box-title text-info">Streaming Designer</h4>
+    </div>
+    <div class="box-body">
+        <div>
+            <ul class="wizard-steps">
+                <li ng-repeat="step in wizardSteps"
+                    class="{{step==curStep?'active':''}} {{step.isComplete?'complete':''}}">
+                    <span style="cursor:pointer;" ng-click="checkForm()?goToStep($index):''" class="step">{{step.step = ($index + 1)}}</span>
+                    <span class="title">{{step.title}}</span>
+                </li>
+            </ul>
+        </div>
+        <hr/>
+        <div class="step-content pos-rel" id="step-container">
+            <div ng-include src="curStep.src"></div>
+        </div>
+        <hr/>
+        <div class="wizard-actions">
+            <div class="row">
+                <div class="col-xs-8" style="display:block;">
+                    <div>
+                    </div>
+                </div>
+                <div class="col-xs-4 pull-right">
+                    <button class="btn btn-prev" ng-click="preView()" ng-show="curStep.title!='Streaming Config'">
+                        <i class="ace-icon fa fa-arrow-left"></i>
+                        Prev
+                    </button>
+                    <button id="nextButton" class="btn btn-success btn-next" ng-disabled="forms[curStep.form].$invalid" ng-click="checkForm()?nextView():''"
+                            ng-show="curStep.title!='Advanced Settings'">
+                        Next
+                        <i class="ace-icon fa fa-arrow-right icon-on-right"></i>
+                    </button>
+                    <button class="btn btn-primary"  ng-click="checkForm()?saveStreaming():''"
+                            ng-if="curStep.title=='Advanced Settings' && state.mode=='edit'">
+                        Save
+                    </button>
+                </div>
+            </div>
+        </div>
+    </div>
+</div>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/tables/source_table_tree.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/tables/source_table_tree.html b/webapp/app/partials/tables/source_table_tree.html
index ff95d10..767eb43 100755
--- a/webapp/app/partials/tables/source_table_tree.html
+++ b/webapp/app/partials/tables/source_table_tree.html
@@ -45,3 +45,5 @@
         <div no-result ng-if="tableModel.selectedSrcDb.length == 0"></div>
     </div>
 </div>
+
+<div ng-include="'partials/tables/table_load.html'"></div>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/tables/table_detail.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/tables/table_detail.html b/webapp/app/partials/tables/table_detail.html
index e692269..b232375 100644
--- a/webapp/app/partials/tables/table_detail.html
+++ b/webapp/app/partials/tables/table_detail.html
@@ -199,117 +199,4 @@
     </div>
   </div>
 
-
-  <script type="text/ng-template" id="addHiveTable.html">
-    <div class="modal-header">
-      <h4>Load Hive Table Metadata</h4>
-    </div>
-    <div class="modal-body">
-      <span><strong>Project: </strong>{{ $parent.projectName!=null?$parent.projectName:'NULL'}}</span>
-      <label for="tables"> Table Names:(Seperate with comma)</label>
-            <textarea ng-model="$parent.tableNames" class="form-control" id="tables"
-                      placeholder="table1,table2  By default,system will choose 'Default' as database,you can specify database like this 'database.table'"></textarea>
-    </div>
-    <div class="modal-footer">
-      <button class="btn btn-primary" ng-click="add()">Sync</button>
-      <button class="btn btn-primary" ng-click="cancel()">Cancel</button>
-    </div>
-  </script>
-
-  <script type="text/ng-template" id="addStreamingSource.html">
-    <div class="modal-header">
-      <h2>Create Streaming Table Schema</h2>
-    </div>
-
-    <div class="modal-body streaming-source" style="height: 480px;">
-      <div class="col-xs-5">
-        <p class="text-info">
-          Need to input streaming source record here, will detect the source schema and create a table schema for
-          streaming.
-        </p>
-
-        <div style="padding:15px;" class="has-error">
-          <small class="help-block" ng-show="streaming.sourceSchema==''&&form.setStreamingSchema.$sbumitted">Please
-            input Streaming source record to generate schema.
-          </small>
-        </div>
-        <div style="margin-bottom: 20px;">
-          <span class="label label-info">JSON</span>
-        </div>
-        <div ng-model="streaming.sourceSchema" ui-ace="{
-    useWrapMode : true,
-    mode:'json',
-    onLoad: streamingOnLoad
-  }">
-
-        </div>
-      </div>
-      <div class="col-xs-1" style="margin-top:300px;text-align:center;">
-        <button type="button" class="btn btn-primary" ng-click="streamingOnChange()"><i
-          class="fa fa-angle-double-right fa-5" style="font-size:2em;"></i></button>
-      </div>
-      <div class="col-xs-6" ng-show="table.sourceValid">
-        <ol class="text-info" style="margin-bottom: 30px;">
-          <li>Choose one 'timestamp' type column for streaming table.</li>
-          <li>Uncheck the 'timestamp' type column which will not be used.</li>
-        </ol>
-        <form class="form-horizontal" name="form.setStreamingSchema" novalidate>
-          <div class="form-group required">
-            <label class="col-xs-4 control-label" style="text-align: left;">Table Name</label>
-
-            <div class="col-xs-8"
-                 ng-class="{'has-error':form.setStreamingSchema.streamingObject.$invalid && (form.setStreamingSchema.streamingObject.$dirty||form.setStreamingSchema.$sbumitted)}">
-              <input type="text" name="streamingObject" required="" ng-model="table.name" class="form-control"/>
-              <small class="help-block"
-                     ng-show="form.setStreamingSchema.streamingObject.$error.required&&(form.setStreamingSchema.streamingObject.$dirty||form.setStreamingSchema.$sbumitted)">
-                Table name is required.
-              </small>
-            </div>
-          </div>
-        </form>
-        <table class="table table-hover table-bordered">
-          <tr>
-            <th>Check As Column</th>
-            <th>Column</th>
-            <th>Column Type</th>
-            <th>Comment</th>
-          </tr>
-          <tr ng-repeat="column in columnList">
-            <td><label style="width:100%;cursor: pointer;" for="{{column.name}}"><input style="width:1em;height:1em;"
-                                                                                        type="checkbox"
-                                                                                        id="{{column.name}}"
-                                                                                        ng-model="column.checked"
-                                                                                        ng-true-value="Y"
-                                                                                        ng-false-value="N"/></label>
-            </td>
-            <td>{{column.name}}</td>
-            <td>
-              <select chosen ng-model="column.type"
-                      ng-options="type as type for type in tableConfig.dataTypes"
-                      data-placeholder="select a column type"
-                      style="width: 200px !important;"
-                      class="chosen-select">
-              </select>
-            </td>
-            <td>
-              <label ng-if="column.type=='timestamp'&&column.fromSource=='Y'" class="badge badge-info">TIMESTAMP</label>
-              <label ng-if="column.fromSource=='N'" class="badge badge-info">AUTO APPEND</label>
-            </td>
-          </tr>
-        </table>
-
-        <div class="has-error" ng-if="rule.timestampColumnConflict">
-          <small class="help-block">
-            You should choose one, and only one 'timestamp' type column generated from source schema.
-          </small>
-        </div>
-      </div>
-    </div>
-    <div class="modal-footer">
-      <button class="btn btn-primary" ng-click="syncStreamingSchema()" ng-disabled="form.setStreamingSchema.$invalid">
-        Submit
-      </button>
-      <button class="btn btn-primary" ng-click="cancel()">Cancel</button>
-    </div>
-  </script>
 </div>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/partials/tables/table_load.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/tables/table_load.html b/webapp/app/partials/tables/table_load.html
new file mode 100644
index 0000000..99c993d
--- /dev/null
+++ b/webapp/app/partials/tables/table_load.html
@@ -0,0 +1,130 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+
+  <script type="text/ng-template" id="addHiveTable.html">
+    <div class="modal-header">
+      <h4>Load Hive Table Metadata</h4>
+    </div>
+    <div class="modal-body">
+      <span><strong>Project: </strong>{{ $parent.projectName!=null?$parent.projectName:'NULL'}}</span>
+      <label for="tables"> Table Names:(Seperate with comma)</label>
+            <textarea ng-model="$parent.tableNames" class="form-control" id="tables"
+                      placeholder="table1,table2  By default,system will choose 'Default' as database,you can specify database like this 'database.table'"></textarea>
+    </div>
+    <div class="modal-footer">
+      <button class="btn btn-primary" ng-click="add()">Sync</button>
+      <button class="btn btn-primary" ng-click="cancel()">Cancel</button>
+    </div>
+  </script>
+
+  <script type="text/ng-template" id="addStreamingSource.html">
+    <div class="modal-header">
+      <h2>Create Streaming Table Schema</h2>
+    </div>
+
+    <div class="modal-body streaming-source" style="height: 660px;">
+      <div class="col-xs-5">
+        <p class="text-info">
+          Need to input streaming source record here, will detect the source schema and create a table schema for
+          streaming.
+        </p>
+
+        <div style="padding:15px;" class="has-error">
+          <small class="help-block" ng-show="streaming.sourceSchema==''&&form.setStreamingSchema.$sbumitted">Please
+            input Streaming source record to generate schema.
+          </small>
+        </div>
+        <div style="margin-bottom: 20px;">
+          <span class="label label-info">JSON</span>
+        </div>
+        <div ng-model="streaming.sourceSchema" ui-ace="{
+    useWrapMode : true,
+    mode:'json',
+    onLoad: streamingOnLoad
+  }">
+
+        </div>
+      </div>
+      <div class="col-xs-1" style="margin-top:300px;text-align:center;">
+        <button type="button" class="btn btn-primary" ng-click="streamingOnChange()"><i
+          class="fa fa-angle-double-right fa-5" style="font-size:2em;"></i></button>
+      </div>
+      <div class="col-xs-6" ng-show="table.sourceValid">
+        <ol class="text-info" style="margin-bottom: 30px;">
+          <li>Choose one 'timestamp' type column for streaming table.</li>
+          <li>Uncheck the 'timestamp' type column which will not be used.</li>
+        </ol>
+        <form class="form-horizontal" name="form.setStreamingSchema" novalidate>
+          <div class="form-group required">
+            <label class="col-xs-4 control-label" style="text-align: left;">Table Name</label>
+
+            <div class="col-xs-8"
+                 ng-class="{'has-error':form.setStreamingSchema.streamingObject.$invalid && (form.setStreamingSchema.streamingObject.$dirty||form.setStreamingSchema.$sbumitted)}">
+              <input type="text" name="streamingObject" required="" ng-model="table.name" class="form-control"/>
+              <small class="help-block"
+                     ng-show="form.setStreamingSchema.streamingObject.$error.required&&(form.setStreamingSchema.streamingObject.$dirty||form.setStreamingSchema.$sbumitted)">
+                Table name is required.
+              </small>
+            </div>
+          </div>
+        </form>
+        <table class="table table-hover table-bordered">
+          <tr>
+            <th>Check As Column</th>
+            <th>Column</th>
+            <th>Column Type</th>
+            <th>Comment</th>
+          </tr>
+          <tr ng-repeat="column in columnList">
+            <td><label style="width:100%;cursor: pointer;" for="{{column.name}}"><input style="width:1em;height:1em;"
+                                                                                        type="checkbox"
+                                                                                        id="{{column.name}}"
+                                                                                        ng-model="column.checked"
+                                                                                        ng-true-value="Y"
+                                                                                        ng-false-value="N"/></label>
+            </td>
+            <td>{{column.name}}</td>
+            <td>
+              <select chosen ng-model="column.type"
+                      ng-options="type as type for type in tableConfig.dataTypes"
+                      data-placeholder="select a column type"
+                      style="width: 200px !important;"
+                      class="chosen-select">
+              </select>
+            </td>
+            <td>
+              <label ng-if="column.type=='timestamp'&&column.fromSource=='Y'" class="badge badge-info">TIMESTAMP</label>
+              <label ng-if="column.fromSource=='N'" class="badge badge-info">AUTO APPEND</label>
+            </td>
+          </tr>
+        </table>
+
+        <div class="has-error" ng-if="rule.timestampColumnConflict">
+          <small class="help-block">
+            You should choose one, and only one 'timestamp' type column generated from source schema.
+          </small>
+        </div>
+      </div>
+    </div>
+    <div class="modal-footer">
+      <button class="btn btn-primary" ng-click="syncStreamingSchema()" ng-disabled="form.setStreamingSchema.$invalid">
+        Submit
+      </button>
+      <button class="btn btn-primary" ng-click="cancel()">Cancel</button>
+    </div>
+  </script>
\ No newline at end of file