You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by zh...@apache.org on 2015/10/22 12:06:56 UTC

[4/5] incubator-kylin git commit: KYLIN-1041, Streaming UI

KYLIN-1041, Streaming UI


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/53b383d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/53b383d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/53b383d9

Branch: refs/heads/2.x-staging
Commit: 53b383d9dc814b9fa53feae2adbbdd99e83be6b1
Parents: a8ead00
Author: jiazhong <ji...@ebay.com>
Authored: Fri Sep 25 17:46:11 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Thu Oct 22 18:05:59 2015 +0800

----------------------------------------------------------------------
 .../kylin/common/restclient/Broadcaster.java    |    2 +-
 .../org/apache/kylin/cube/CubeDescManager.java  |    2 +-
 .../kylin/engine/streaming/StreamingConfig.java |    9 +
 .../engine/streaming/StreamingManager.java      |  163 ++-
 .../kylin/rest/controller/CubeController.java   |  219 +++-
 .../kylin/rest/controller/ModelController.java  |    2 +-
 .../rest/controller/StreamingController.java    |  225 ++++
 .../apache/kylin/rest/request/CubeRequest.java  |   26 +
 .../kylin/rest/request/StreamingRequest.java    |   41 +
 .../apache/kylin/rest/service/BasicService.java |   10 +
 .../apache/kylin/rest/service/CacheService.java |  152 ++-
 .../kylin/rest/service/KafkaConfigService.java  |   94 ++
 .../kylin/rest/service/StreamingService.java    |   90 ++
 server/src/main/resources/kylinSecurity.xml     |    2 +
 .../kylin/source/kafka/KafkaConfigManager.java  |  140 +-
 .../kylin/source/kafka/config/KafkaConfig.java  |    9 +
 webapp/app/index.html                           |   32 +-
 webapp/app/js/app.js                            |    2 +-
 webapp/app/js/controllers/cubeAdvanceSetting.js |   28 +
 webapp/app/js/controllers/cubeEdit.js           | 1216 ++++++++++--------
 webapp/app/js/controllers/cubeFilter.js         |   23 -
 webapp/app/js/controllers/cubeMeasures.js       |   96 ++
 webapp/app/js/controllers/cubeSchema.js         |  130 +-
 webapp/app/js/controllers/cubes.js              |   21 +-
 webapp/app/js/controllers/modelSchema.js        |  429 +++---
 webapp/app/js/controllers/sourceMeta.js         |   10 +-
 webapp/app/js/controllers/streamingConfig.js    |   75 ++
 .../app/js/controllers/streamingKafkaConfig.js  |   23 +
 webapp/app/js/model/cubeConfig.js               |    3 +-
 webapp/app/js/model/cubeListModel.js            |    2 +-
 webapp/app/js/model/streamingListModel.js       |   80 ++
 webapp/app/js/model/streamingModel.js           |   66 +
 webapp/app/js/services/streaming.js             |   28 +
 webapp/app/partials/cubeDesigner/filter.html    |   66 -
 webapp/app/partials/cubeDesigner/info.html      |    4 +-
 .../cubeDesigner/kafkaAdvancedConfig.html       |  165 +++
 .../partials/cubeDesigner/kafkaBasicConfig.html |  114 ++
 webapp/app/partials/cubeDesigner/measures.html  |  360 +++---
 .../partials/cubeDesigner/streamingConfig.html  |  282 ++++
 webapp/app/partials/cubes/cube_detail.html      |   10 +-
 webapp/app/partials/cubes/cubes.html            |    7 +
 .../modelDesigner/conditions_settings.html      |    6 +-
 .../app/partials/modelDesigner/data_model.html  |    2 +-
 .../app/partials/modelDesigner/model_info.html  |    2 +-
 webapp/app/partials/models/models_tree.html     |    4 -
 .../app/partials/streaming/streaming_edit.html  |   34 +
 .../partials/streaming/streaming_schema.html    |   63 +
 .../app/partials/tables/source_table_tree.html  |    2 +
 webapp/app/partials/tables/table_detail.html    |  113 --
 webapp/app/partials/tables/table_load.html      |  130 ++
 webapp/bower.json                               |    3 +-
 webapp/grunt.json                               |    2 +
 52 files changed, 3473 insertions(+), 1346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
index 0b6152f..80ec33c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
@@ -138,7 +138,7 @@ public class Broadcaster {
     }
 
     public enum TYPE {
-        ALL("all"), CUBE("cube"), CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
+        ALL("all"), CUBE("cube"),STREAMING("streaming"),KAFKA("kafka"),CUBE_DESC("cube_desc"), PROJECT("project"), INVERTED_INDEX("inverted_index"), INVERTED_INDEX_DESC("ii_desc"), TABLE("table"), DATA_MODEL("data_model"), HYBRID("hybrid");
         private String text;
 
         TYPE(String text) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index b5e3eb2..8e75d29 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -228,7 +228,7 @@ public class CubeDescManager {
      * @return
      * @throws IOException
      */
-    public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException {
+        public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException {
         // Validate CubeDesc
         if (desc.getUuid() == null || desc.getName() == null) {
             throw new IllegalArgumentException();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
index 3b3b4f7..a7c6da0 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
@@ -41,6 +41,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.persistence.Serializer;
 
@@ -98,6 +99,14 @@ public class StreamingConfig extends RootPersistentEntity {
         this.margin = margin;
     }
 
+    public String getResourcePath() {
+        return concatResourcePath(name);
+    }
+
+    public static String concatResourcePath(String streamingName) {
+        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + streamingName + ".json";
+    }
+
     @Override
     public StreamingConfig clone() {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index 0e94b04..112379e 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -40,15 +40,17 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,10 +70,21 @@ public class StreamingManager {
     // static cached instances
     private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
 
+    public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+
     private KylinConfig config;
 
-    private StreamingManager(KylinConfig config) {
+    // name ==> StreamingConfig
+    private CaseInsensitiveStringCache<StreamingConfig> streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(Broadcaster.TYPE.STREAMING);
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    private StreamingManager(KylinConfig config) throws IOException {
         this.config = config;
+        reloadAllStreaming();
     }
 
     private ResourceStore getStore() {
@@ -89,12 +102,16 @@ public class StreamingManager {
             if (r != null) {
                 return r;
             }
+            try{
             r = new StreamingManager(config);
             CACHE.put(config, r);
             if (CACHE.size() > 1) {
-                logger.warn("More than one cubemanager singleton exist");
+                logger.warn("More than one streamingManager singleton exist");
             }
             return r;
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
+        }
         }
     }
 
@@ -114,32 +131,96 @@ public class StreamingManager {
         return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
     }
 
-    public boolean createOrUpdateKafkaConfig(String name, StreamingConfig config) {
-        try {
-            getStore().putResource(formatStreamingConfigPath(name), config, StreamingConfig.SERIALIZER);
-            return true;
-        } catch (IOException e) {
-            logger.error("error save resource name:" + name, e);
-            throw new RuntimeException("error save resource name:" + name, e);
-        }
-    }
 
     public StreamingConfig getStreamingConfig(String name) {
-        try {
-            return getStore().getResource(formatStreamingConfigPath(name), StreamingConfig.class, StreamingConfig.SERIALIZER);
-        } catch (IOException e) {
-            logger.error("error get resource name:" + name, e);
-            throw new RuntimeException("error get resource name:" + name, e);
+        return streamingMap.get(name);
+    }
+
+    public List<StreamingConfig> listAllStreaming() {
+        return new ArrayList<>(streamingMap.values());
+    }
+
+
+    /**
+     * Reload StreamingConfig from resource store It will be triggered by an desc
+     * update event.
+     *
+     * @param name
+     * @throws IOException
+     */
+    public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException {
+
+        // Save Source
+        String path = StreamingConfig.concatResourcePath(name);
+
+        // Reload the StreamingConfig
+        StreamingConfig ndesc = loadStreamingConfigAt(path);
+
+        // Here replace the old one
+        streamingMap.putLocal(ndesc.getName(), ndesc);
+        return ndesc;
+    }
+
+    // remove streamingConfig
+    public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+        String path = streamingConfig.getResourcePath();
+        getStore().deleteResource(path);
+        streamingMap.remove(streamingConfig.getName());
+    }
+
+
+    public StreamingConfig getConfig(String name) {
+        name = name.toUpperCase();
+        return streamingMap.get(name);
+    }
+
+    public void removeStreamingLocal(String streamingName) {
+        streamingMap.removeLocal(streamingName);
+    }
+
+    /**
+     * Update CubeDesc with the input. Broadcast the event into cluster
+     *
+     * @param desc
+     * @return
+     * @throws IOException
+     */
+    public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException {
+        // Validate CubeDesc
+        if (desc.getUuid() == null || desc.getName() == null) {
+            throw new IllegalArgumentException();
+        }
+        String name = desc.getName();
+        if (!streamingMap.containsKey(name)) {
+            throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
         }
+
+
+        // Save Source
+        String path = desc.getResourcePath();
+        getStore().putResource(path, desc, STREAMING_SERIALIZER);
+
+        // Reload the StreamingConfig
+        StreamingConfig ndesc = loadStreamingConfigAt(path);
+        // Here replace the old one
+        streamingMap.put(ndesc.getName(), desc);
+
+        return ndesc;
     }
 
-    public void saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+
+    public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
         if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
             throw new IllegalArgumentException();
         }
 
+        if (streamingMap.containsKey(streamingConfig.getName()))
+            throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
+
         String path = formatStreamingConfigPath(streamingConfig.getName());
         getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
+        streamingMap.put(streamingConfig.getName(),streamingConfig);
+        return streamingConfig;
     }
 
     public long getOffset(String streaming, int shard) {
@@ -199,6 +280,46 @@ public class StreamingManager {
         }
     }
 
+    private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
+        ResourceStore store = getStore();
+        StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
+
+        if (StringUtils.isBlank(streamingDesc.getName())) {
+            throw new IllegalStateException("StreamingConfig name must not be blank");
+        }
+        return streamingDesc;
+    }
+
+    private void reloadAllStreaming() throws IOException {
+        ResourceStore store = getStore();
+        logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
+
+        streamingMap.clear();
+
+        List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
+        for (String path : paths) {
+            StreamingConfig streamingConfig;
+            try {
+                streamingConfig = loadStreamingConfigAt(path);
+            } catch (Exception e) {
+                logger.error("Error loading streaming desc " + path, e);
+                continue;
+            }
+            if (path.equals(streamingConfig.getResourcePath()) == false) {
+                logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath());
+                continue;
+            }
+            if (streamingMap.containsKey(streamingConfig.getName())) {
+                logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path);
+                continue;
+            }
+
+            streamingMap.putLocal(streamingConfig.getName(), streamingConfig);
+        }
+
+        logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
+    }    
+    
     private final ObjectMapper mapper = new ObjectMapper();
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index d61793d..9ce6452 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -34,6 +34,7 @@ import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.exception.JobException;
@@ -46,10 +47,14 @@ import org.apache.kylin.rest.exception.NotFoundException;
 import org.apache.kylin.rest.request.CubeRequest;
 import org.apache.kylin.rest.request.CubeSegmentRequest;
 import org.apache.kylin.rest.request.JobBuildRequest;
+import org.apache.kylin.rest.request.StreamingRequest;
 import org.apache.kylin.rest.response.GeneralResponse;
 import org.apache.kylin.rest.response.HBaseResponse;
 import org.apache.kylin.rest.service.CubeService;
 import org.apache.kylin.rest.service.JobService;
+import org.apache.kylin.rest.service.KafkaConfigService;
+import org.apache.kylin.rest.service.StreamingService;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,8 +75,6 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 
 /**
  * CubeController is defined as Restful API entrance for UI.
- *
- * @author jianliu
  */
 @Controller
 @RequestMapping(value = "/cubes")
@@ -79,12 +82,18 @@ public class CubeController extends BasicController {
     private static final Logger logger = LoggerFactory.getLogger(CubeController.class);
 
     @Autowired
+    private StreamingService streamingService;
+
+    @Autowired
+    private KafkaConfigService kafkaConfigService;
+
+    @Autowired
     private CubeService cubeService;
 
     @Autowired
     private JobService jobService;
 
-    @RequestMapping(value = "", method = { RequestMethod.GET })
+    @RequestMapping(value = "", method = {RequestMethod.GET})
     @ResponseBody
     public List<CubeInstance> getCubes(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "modelName", required = false) String modelName, @RequestParam(value = "projectName", required = false) String projectName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
         return cubeService.getCubes(cubeName, projectName, modelName, limit, offset);
@@ -98,7 +107,7 @@ public class CubeController extends BasicController {
      * @throws UnknownHostException
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/sql", method = { RequestMethod.GET })
+    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/sql", method = {RequestMethod.GET})
     @ResponseBody
     public GeneralResponse getSql(@PathVariable String cubeName, @PathVariable String segmentName) {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@ -120,7 +129,7 @@ public class CubeController extends BasicController {
      * @param notifyList
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/notify_list", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/notify_list", method = {RequestMethod.PUT})
     @ResponseBody
     public void updateNotifyList(@PathVariable String cubeName, @RequestBody List<String> notifyList) {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@ -138,7 +147,7 @@ public class CubeController extends BasicController {
 
     }
 
-    @RequestMapping(value = "/{cubeName}/cost", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/cost", method = {RequestMethod.PUT})
     @ResponseBody
     public CubeInstance updateCubeCost(@PathVariable String cubeName, @RequestParam(value = "cost") int cost) {
         try {
@@ -150,7 +159,7 @@ public class CubeController extends BasicController {
         }
     }
 
-    @RequestMapping(value = "/{cubeName}/coprocessor", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/coprocessor", method = {RequestMethod.PUT})
     @ResponseBody
     public Map<String, Boolean> updateCubeCoprocessor(@PathVariable String cubeName, @RequestParam(value = "force") String force) {
         try {
@@ -168,7 +177,7 @@ public class CubeController extends BasicController {
      *
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = {RequestMethod.PUT})
     @ResponseBody
     public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName, @RequestParam(value = "lookupTable") String lookupTable) {
         try {
@@ -186,7 +195,7 @@ public class CubeController extends BasicController {
      * @return
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/rebuild", method = {RequestMethod.PUT})
     @ResponseBody
     public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest jobBuildRequest) {
         try {
@@ -203,7 +212,7 @@ public class CubeController extends BasicController {
         }
     }
 
-    @RequestMapping(value = "/{cubeName}/disable", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/disable", method = {RequestMethod.PUT})
     @ResponseBody
     public CubeInstance disableCube(@PathVariable String cubeName) {
         try {
@@ -221,7 +230,7 @@ public class CubeController extends BasicController {
         }
     }
 
-    @RequestMapping(value = "/{cubeName}/purge", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/purge", method = {RequestMethod.PUT})
     @ResponseBody
     public CubeInstance purgeCube(@PathVariable String cubeName) {
         try {
@@ -239,7 +248,7 @@ public class CubeController extends BasicController {
         }
     }
 
-    @RequestMapping(value = "/{cubeName}/enable", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/{cubeName}/enable", method = {RequestMethod.PUT})
     @ResponseBody
     public CubeInstance enableCube(@PathVariable String cubeName) {
         try {
@@ -256,7 +265,7 @@ public class CubeController extends BasicController {
         }
     }
 
-    @RequestMapping(value = "/{cubeName}", method = { RequestMethod.DELETE })
+    @RequestMapping(value = "/{cubeName}", method = {RequestMethod.DELETE})
     @ResponseBody
     public void deleteCube(@PathVariable String cubeName) {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@ -264,44 +273,126 @@ public class CubeController extends BasicController {
             throw new NotFoundException("Cube with name " + cubeName + " not found..");
         }
 
+        //drop related StreamingConfig KafkaConfig if exist
+        try {
+            List<StreamingConfig> configs= streamingService.listAllStreamingConfigs(cubeName);
+            for(StreamingConfig config:configs){
+                try {
+                    streamingService.dropStreamingConfig(config);
+                } catch (IOException e) {
+                    logger.error(e.getLocalizedMessage(), e);
+                    throw new InternalErrorException("Failed to delete StreamingConfig. " + " Caused by: " + e.getMessage(), e);
+                }
+                try {
+                    KafkaConfig kfkConfig = kafkaConfigService.getKafkaConfig(config.getName());
+                    kafkaConfigService.dropKafkaConfig(kfkConfig);
+                } catch (IOException e) {
+                    throw new InternalErrorException("Failed to delete KafkaConfig. " + " Caused by: " + e.getMessage(), e);
+                }
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        //drop Cube
         try {
             cubeService.deleteCube(cube);
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e);
             throw new InternalErrorException("Failed to delete cube. " + " Caused by: " + e.getMessage(), e);
         }
+
     }
 
     /**
-     *save cubeDesc
+     * save cubeDesc
      *
      * @return Table metadata array
      * @throws IOException
      */
-    @RequestMapping(value = "", method = { RequestMethod.POST })
+    @RequestMapping(value = "", method = {RequestMethod.POST})
     @ResponseBody
     public CubeRequest saveCubeDesc(@RequestBody CubeRequest cubeRequest) {
 
         CubeDesc desc = deserializeCubeDesc(cubeRequest);
         if (desc == null) {
+            cubeRequest.setMessage("CubeDesc is null.");
             return cubeRequest;
         }
-
         String name = CubeService.getCubeNameFromDesc(desc.getName());
         if (StringUtils.isEmpty(name)) {
             logger.info("Cube name should not be empty.");
             throw new BadRequestException("Cube name should not be empty.");
         }
 
+        CubeInstance cubeInstance;
+
         try {
             desc.setUuid(UUID.randomUUID().toString());
             String projectName = (null == cubeRequest.getProject()) ? ProjectInstance.DEFAULT_PROJECT_NAME : cubeRequest.getProject();
-            cubeService.createCubeAndDesc(name, projectName, desc);
+            cubeInstance = cubeService.createCubeAndDesc(name, projectName, desc);
         } catch (Exception e) {
             logger.error("Failed to deal with the request.", e);
             throw new InternalErrorException(e.getLocalizedMessage(), e);
         }
 
+        //streaming Cube
+        if (cubeRequest.getStreamingCube().equals("true")) {
+            StreamingConfig streamingConfig = deserializeStreamingDesc(cubeRequest);
+            KafkaConfig kafkaConfig = deserializeKafkaDesc(cubeRequest);
+            if (streamingConfig == null) {
+                cubeRequest.setMessage("No StreamingConfig info defined.");
+                return cubeRequest;
+            }
+            if (kafkaConfig == null) {
+                cubeRequest.setMessage("No KafkaConfig info defined.");
+                return cubeRequest;
+            }
+            if (StringUtils.isEmpty(streamingConfig.getName())) {
+                logger.info("StreamingConfig Name should not be empty.");
+                throw new BadRequestException("StremingConfig name should not be empty.");
+            }
+
+            if (StringUtils.isEmpty(kafkaConfig.getName())) {
+                logger.info("KafkaConfig Name should not be empty.");
+                throw new BadRequestException("KafkaConfig name should not be empty.");
+            }
+
+            try {
+                streamingConfig.setUuid(UUID.randomUUID().toString());
+                streamingService.createStreamingConfig(streamingConfig);
+
+            } catch (IOException e) {
+                try {
+                    cubeService.deleteCube(cubeInstance);
+                } catch (Exception ex) {
+                    throw new InternalErrorException("Failed to rollback on delete cube. " + " Caused by: " + e.getMessage(), ex);
+                }
+                logger.error("Failed to save StreamingConfig:" + e.getLocalizedMessage(), e);
+                throw new InternalErrorException("Failed to save StreamingConfig: " + e.getLocalizedMessage());
+            }
+            try {
+                kafkaConfig.setUuid(UUID.randomUUID().toString());
+                kafkaConfigService.createKafkaConfig(kafkaConfig);
+            } catch (IOException e) {
+                try {
+                    streamingService.dropStreamingConfig(streamingConfig);
+                } catch (IOException e1) {
+                    throw new InternalErrorException("StreamingConfig is created, but failed to create KafkaConfig: " + e.getLocalizedMessage());
+                }
+
+                try {
+                    cubeService.deleteCube(cubeInstance);
+                } catch (Exception ex) {
+                    throw new InternalErrorException("Failed to rollback on delete cube. " + " Caused by: " + e.getMessage(), ex);
+                }
+                logger.error("Failed to save KafkaConfig:" + e.getLocalizedMessage(), e);
+                throw new InternalErrorException("Failed to save KafkaConfig: " + e.getLocalizedMessage());
+            }
+
+        }
+
+
         cubeRequest.setUuid(desc.getUuid());
         cubeRequest.setSuccessful(true);
         return cubeRequest;
@@ -314,7 +405,7 @@ public class CubeController extends BasicController {
      * @throws JsonProcessingException
      * @throws IOException
      */
-    @RequestMapping(value = "", method = { RequestMethod.PUT })
+    @RequestMapping(value = "", method = {RequestMethod.PUT})
     @ResponseBody
     public CubeRequest updateCubeDesc(@RequestBody CubeRequest cubeRequest) throws JsonProcessingException {
 
@@ -356,6 +447,46 @@ public class CubeController extends BasicController {
             logger.warn("Cube " + desc.getName() + " fail to update because " + desc.getError());
             updateRequest(cubeRequest, false, omitMessage(desc.getError()));
         }
+
+        //streaming Cube
+        if (cubeRequest.getStreamingCube().equals("true")) {
+            StreamingConfig streamingConfig = deserializeStreamingDesc(cubeRequest);
+            KafkaConfig kafkaConfig = deserializeKafkaDesc(cubeRequest);
+            if (streamingConfig == null) {
+                cubeRequest.setMessage("No StreamingConfig info to update.");
+                return cubeRequest;
+            }
+            if (kafkaConfig == null) {
+                cubeRequest.setMessage("No KafkaConfig info to update.");
+                return cubeRequest;
+            }
+            if (StringUtils.isEmpty(streamingConfig.getName())) {
+                logger.info("StreamingConfig Name should not be empty.");
+                throw new BadRequestException("StremingConfig name should not be empty.");
+            }
+
+            if (StringUtils.isEmpty(kafkaConfig.getName())) {
+                logger.info("KafkaConfig Name should not be empty.");
+                throw new BadRequestException("KafkaConfig name should not be empty.");
+            }
+
+            try {
+                streamingService.updateStreamingConfig(streamingConfig);
+
+            } catch (IOException e) {
+                logger.error("Failed to update StreamingConfig:" + e.getLocalizedMessage(), e);
+                throw new InternalErrorException("Failed to update StreamingConfig: " + e.getLocalizedMessage());
+            }
+            try {
+                kafkaConfigService.updateKafkaConfig(kafkaConfig);
+            } catch (IOException e) {
+                logger.error("Failed to update KafkaConfig:" + e.getLocalizedMessage(), e);
+                throw new InternalErrorException("Failed to update KafkaConfig: " + e.getLocalizedMessage());
+            }
+
+        }
+
+
         String descData = JsonUtil.writeValueAsIndentString(desc);
         cubeRequest.setCubeDescData(descData);
         cubeRequest.setSuccessful(true);
@@ -363,12 +494,12 @@ public class CubeController extends BasicController {
     }
 
     /**
-     *get Hbase Info
+     * get Hbase Info
      *
      * @return true
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/hbase", method = { RequestMethod.GET })
+    @RequestMapping(value = "/{cubeName}/hbase", method = {RequestMethod.GET})
     @ResponseBody
     public List<HBaseResponse> getHBaseInfo(@PathVariable String cubeName) {
         List<HBaseResponse> hbase = new ArrayList<HBaseResponse>();
@@ -405,7 +536,7 @@ public class CubeController extends BasicController {
         return hbase;
     }
 
-    @RequestMapping(value = "/{cubeName}/segments", method = { RequestMethod.POST })
+    @RequestMapping(value = "/{cubeName}/segments", method = {RequestMethod.POST})
     @ResponseBody
     public CubeSegmentRequest appendSegment(@PathVariable String cubeName, @RequestBody CubeSegmentRequest cubeSegmentRequest) {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
@@ -470,12 +601,49 @@ public class CubeController extends BasicController {
         return desc;
     }
 
+    private StreamingConfig deserializeStreamingDesc(CubeRequest cubeRequest) {
+        StreamingConfig desc = null;
+        try {
+            logger.debug("Saving StreamingConfig " + cubeRequest.getStreamingData());
+            desc = JsonUtil.readValue(cubeRequest.getStreamingData(), StreamingConfig.class);
+        } catch (JsonParseException e) {
+            logger.error("The StreamingConfig definition is not valid.", e);
+            updateRequest(cubeRequest, false, e.getMessage());
+        } catch (JsonMappingException e) {
+            logger.error("The data StreamingConfig definition is not valid.", e);
+            updateRequest(cubeRequest, false, e.getMessage());
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request.", e);
+            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+        }
+        return desc;
+    }
+
+
+    private KafkaConfig deserializeKafkaDesc(CubeRequest cubeRequest) {
+        KafkaConfig desc = null;
+        try {
+            logger.debug("Saving KafkaConfig " + cubeRequest.getKafkaData());
+            desc = JsonUtil.readValue(cubeRequest.getKafkaData(), KafkaConfig.class);
+        } catch (JsonParseException e) {
+            logger.error("The KafkaConfig definition is not valid.", e);
+            updateRequest(cubeRequest, false, e.getMessage());
+        } catch (JsonMappingException e) {
+            logger.error("The data KafkaConfig definition is not valid.", e);
+            updateRequest(cubeRequest, false, e.getMessage());
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request.", e);
+            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+        }
+        return desc;
+    }
+
     /**
      * @return
      */
     private String omitMessage(List<String> errors) {
         StringBuffer buffer = new StringBuffer();
-        for (Iterator<String> iterator = errors.iterator(); iterator.hasNext();) {
+        for (Iterator<String> iterator = errors.iterator(); iterator.hasNext(); ) {
             String string = (String) iterator.next();
             buffer.append(string);
             buffer.append("\n");
@@ -497,4 +665,11 @@ public class CubeController extends BasicController {
         this.jobService = jobService;
     }
 
+    public void setStreamingService(StreamingService streamingService) {
+        this.streamingService = streamingService;
+    }
+
+    public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {
+        this.kafkaConfigService = kafkaConfigService;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java b/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java
index b8af9d6..a409938 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/ModelController.java
@@ -117,7 +117,7 @@ public class ModelController extends BasicController {
         try {
             modelDesc = modelService.updateModelAndDesc(modelDesc);
         } catch (AccessDeniedException accessDeniedException) {
-            throw new ForbiddenException("You don't have right to update this cube.");
+            throw new ForbiddenException("You don't have right to update this model.");
         } catch (Exception e) {
             logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
             throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
new file mode 100644
index 0000000..78b63c0
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.controller;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.rest.exception.BadRequestException;
+import org.apache.kylin.rest.exception.ForbiddenException;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.exception.NotFoundException;
+import org.apache.kylin.rest.request.StreamingRequest;
+import org.apache.kylin.rest.service.KafkaConfigService;
+import org.apache.kylin.rest.service.StreamingService;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.AccessDeniedException;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.*;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * StreamingController is defined as Restful API entrance for UI.
+ *
+ * @author jiazhong
+ */
+@Controller
+@RequestMapping(value = "/streaming")
+public class StreamingController extends BasicController {
+    private static final Logger logger = LoggerFactory.getLogger(StreamingController.class);
+
+    @Autowired
+    private StreamingService streamingService;
+    @Autowired
+    private KafkaConfigService kafkaConfigService;
+
+    @RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
+    @ResponseBody
+    public List<StreamingConfig> getStreamings(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
+        try {
+            return streamingService.getStreamingConfigs(cubeName, limit, offset);
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+        }
+    }
+
+    @RequestMapping(value = "/getKfkConfig", method = { RequestMethod.GET })
+    @ResponseBody
+    public List<KafkaConfig> getKafkaConfigs(@RequestParam(value = "kafkaConfigName", required = false) String kafkaConfigName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
+        try {
+            return kafkaConfigService.getKafkaConfigs(kafkaConfigName, limit, offset);
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+        }
+    }
+
+
+    /**
+     *
+     * create Streaming Schema
+     * @throws java.io.IOException
+     */
+    @RequestMapping(value = "", method = { RequestMethod.POST })
+    @ResponseBody
+    public StreamingRequest saveStreamingConfig(@RequestBody StreamingRequest streamingRequest) {
+        //Update Model
+        StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
+        KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
+        if (streamingConfig == null ||kafkaConfig == null) {
+            return streamingRequest;
+        }
+        if (StringUtils.isEmpty(streamingConfig.getName())) {
+            logger.info("StreamingConfig should not be empty.");
+            throw new BadRequestException("StremingConfig name should not be empty.");
+        }
+
+        try {
+            streamingConfig.setUuid(UUID.randomUUID().toString());
+            streamingService.createStreamingConfig(streamingConfig);
+
+        } catch (IOException e) {
+            logger.error("Failed to save StreamingConfig:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to save StreamingConfig: " + e.getLocalizedMessage());
+        }
+        try {
+            kafkaConfig.setUuid(UUID.randomUUID().toString());
+            kafkaConfigService.createKafkaConfig(kafkaConfig);
+        }catch (IOException e){
+            try {
+                streamingService.dropStreamingConfig(streamingConfig);
+            } catch (IOException e1) {
+                throw new InternalErrorException("StreamingConfig is created, but failed to create KafkaConfig: " + e.getLocalizedMessage());
+            }
+            logger.error("Failed to save KafkaConfig:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to save KafkaConfig: " + e.getLocalizedMessage());
+        }
+        streamingRequest.setSuccessful(true);
+        return streamingRequest;
+    }
+
+    @RequestMapping(value = "", method = { RequestMethod.PUT })
+    @ResponseBody
+        public StreamingRequest updateModelDesc(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
+        StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
+        KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
+
+        if (streamingConfig == null) {
+            return streamingRequest;
+        }
+        try {
+            streamingConfig = streamingService.updateStreamingConfig(streamingConfig);
+        } catch (AccessDeniedException accessDeniedException) {
+            throw new ForbiddenException("You don't have right to update this StreamingConfig.");
+        } catch (Exception e) {
+            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+        }
+        try {
+            kafkaConfig = kafkaConfigService.updateKafkaConfig(kafkaConfig);
+        }catch (AccessDeniedException accessDeniedException) {
+            throw new ForbiddenException("You don't have right to update this KafkaConfig.");
+        } catch (Exception e) {
+            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+        }
+
+        streamingRequest.setSuccessful(true);
+
+        return streamingRequest;
+    }
+
+    @RequestMapping(value = "/{configName}", method = { RequestMethod.DELETE })
+    @ResponseBody
+    public void deleteConfig(@PathVariable String configName) throws IOException {
+        StreamingConfig config = streamingService.getSreamingManager().getStreamingConfig(configName);
+        KafkaConfig kafkaConfig = kafkaConfigService.getKafkaConfig(configName);
+        if (null == config) {
+            throw new NotFoundException("StreamingConfig with name " + configName + " not found..");
+        }
+        try {
+            streamingService.dropStreamingConfig(config);
+            kafkaConfigService.dropKafkaConfig(kafkaConfig);
+        } catch (Exception e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to delete StreamingConfig. " + " Caused by: " + e.getMessage(), e);
+        }
+    }
+
+    private StreamingConfig deserializeSchemalDesc(StreamingRequest streamingRequest) {
+        StreamingConfig desc = null;
+        try {
+            logger.debug("Saving StreamingConfig " + streamingRequest.getStreamingConfig());
+            desc = JsonUtil.readValue(streamingRequest.getStreamingConfig(), StreamingConfig.class);
+        } catch (JsonParseException e) {
+            logger.error("The StreamingConfig definition is not valid.", e);
+            updateRequest(streamingRequest, false, e.getMessage());
+        } catch (JsonMappingException e) {
+            logger.error("The data StreamingConfig definition is not valid.", e);
+            updateRequest(streamingRequest, false, e.getMessage());
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request.", e);
+            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+        }
+        return desc;
+    }
+
+
+    private KafkaConfig deserializeKafkaSchemalDesc(StreamingRequest streamingRequest) {
+        KafkaConfig desc = null;
+        try {
+            logger.debug("Saving KafkaConfig " + streamingRequest.getKafkaConfig());
+            desc = JsonUtil.readValue(streamingRequest.getKafkaConfig(), KafkaConfig.class);
+        } catch (JsonParseException e) {
+            logger.error("The KafkaConfig definition is not valid.", e);
+            updateRequest(streamingRequest, false, e.getMessage());
+        } catch (JsonMappingException e) {
+            logger.error("The data KafkaConfig definition is not valid.", e);
+            updateRequest(streamingRequest, false, e.getMessage());
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request.", e);
+            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+        }
+        return desc;
+    }
+
+    private void updateRequest(StreamingRequest request, boolean success, String message) {
+        request.setSuccessful(success);
+        request.setMessage(message);
+    }
+
+    public void setStreamingService(StreamingService streamingService) {
+        this.streamingService= streamingService;
+    }
+
+    public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {
+        this.kafkaConfigService = kafkaConfigService;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java b/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java
index 1371a4f..fb9952b 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/CubeRequest.java
@@ -23,9 +23,12 @@ public class CubeRequest {
     private String uuid;
     private String cubeName;
     private String cubeDescData;
+    private String streamingData;
+    private String kafkaData;
     private boolean successful;
     private String message;
     private String project;
+    private String streamingCube;
 
     public String getUuid() {
         return uuid;
@@ -104,4 +107,27 @@ public class CubeRequest {
         this.project = project;
     }
 
+    public String getStreamingCube() {
+        return streamingCube;
+    }
+
+    public void setStreamingCube(String streamingCube) {
+        this.streamingCube = streamingCube;
+    }
+
+    public String getStreamingData() {
+        return streamingData;
+    }
+
+    public void setStreamingData(String streamingData) {
+        this.streamingData = streamingData;
+    }
+
+    public String getKafkaData() {
+        return kafkaData;
+    }
+
+    public void setKafkaData(String kafkaData) {
+        this.kafkaData = kafkaData;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
index 3bcf9d7..07c30f3 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
@@ -25,6 +25,14 @@ import java.lang.String;public class StreamingRequest {
 
     private String tableData;
 
+    private String streamingConfig;
+
+    private String kafkaConfig;
+
+    private boolean successful;
+
+    private String message;
+
     public String getProject() {
         return project;
     }
@@ -40,4 +48,37 @@ import java.lang.String;public class StreamingRequest {
     public void setTableData(String tableData) {
         this.tableData = tableData;
     }
+
+    public boolean isSuccessful() {
+        return successful;
+    }
+
+    public void setSuccessful(boolean successful) {
+        this.successful = successful;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public String getStreamingConfig() {
+        return streamingConfig;
+    }
+
+    public void setStreamingConfig(String streamingConfig) {
+        this.streamingConfig = streamingConfig;
+    }
+
+    public String getKafkaConfig() {
+        return kafkaConfig;
+    }
+
+    public void setKafkaConfig(String kafkaConfig) {
+        this.kafkaConfig = kafkaConfig;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 70deada..5ac12ea 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -43,6 +43,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.invertedindex.IIDescManager;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -56,6 +57,7 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.enumerator.OLAPQuery;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.storage.hybrid.HybridManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -167,6 +169,14 @@ public abstract class BasicService {
         return CubeManager.getInstance(getConfig());
     }
 
+    public final StreamingManager getSreamingManager() {
+        return StreamingManager.getInstance(getConfig());
+    }
+
+    public final KafkaConfigManager getKafkaManager() throws IOException {
+        return  KafkaConfigManager.getInstance(getConfig());
+    }
+
     public final CubeDescManager getCubeDescManager() {
         return CubeDescManager.getInstance(getConfig());
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 45cff4d..98ddc6f 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -28,6 +28,7 @@ import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.invertedindex.IIDescManager;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.metadata.MetadataManager;
@@ -35,6 +36,7 @@ import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.RealizationRegistry;
 import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.storage.hybrid.HybridManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,54 +77,62 @@ public class CacheService extends BasicService {
         logger.info(log);
         try {
             switch (cacheType) {
-            case CUBE:
-                CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
-                getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
-                getProjectManager().clearL2Cache();
-                //clean query related cache first
-                super.cleanDataCache(newCube.getUuid());
-                cubeService.updateOnNewSegmentReady(cacheKey);
-                break;
-            case CUBE_DESC:
-                getCubeDescManager().reloadCubeDescLocal(cacheKey);
-                break;
-            case PROJECT:
-                ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
-                removeOLAPDataSource(projectInstance.getName());
-                break;
-            case INVERTED_INDEX:
-                //II update does not need to update storage cache because it is dynamic already
-                getIIManager().reloadIILocal(cacheKey);
-                getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey);
-                getProjectManager().clearL2Cache();
-                break;
-            case INVERTED_INDEX_DESC:
-                getIIDescManager().reloadIIDescLocal(cacheKey);
-                break;
-            case TABLE:
-                getMetadataManager().reloadTableCache(cacheKey);
-                IIDescManager.clearCache();
-                CubeDescManager.clearCache();
-                break;
-            case DATA_MODEL:
-                getMetadataManager().reloadDataModelDesc(cacheKey);
-                IIDescManager.clearCache();
-                CubeDescManager.clearCache();
-                break;
-            case ALL:
-                MetadataManager.clearCache();
-                CubeDescManager.clearCache();
-                CubeManager.clearCache();
-                IIDescManager.clearCache();
-                IIManager.clearCache();
-                HybridManager.clearCache();
-                RealizationRegistry.clearCache();
-                ProjectManager.clearCache();
-                super.cleanAllDataCache();
-                BasicService.removeAllOLAPDataSources();
-                break;
-            default:
-                throw new RuntimeException("invalid cacheType:" + cacheType);
+                case CUBE:
+                    CubeInstance newCube = getCubeManager().reloadCubeLocal(cacheKey);
+                    getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey);
+                    getProjectManager().clearL2Cache();
+                    //clean query related cache first
+                    super.cleanDataCache(newCube.getUuid());
+                    cubeService.updateOnNewSegmentReady(cacheKey);
+                    break;
+                case STREAMING:
+                    getSreamingManager().reloadStreamingConfigLocal(cacheKey);
+                    break;
+                case KAFKA:
+                    getKafkaManager().reloadKafkaConfigLocal(cacheKey);
+                    break;
+                case CUBE_DESC:
+                    getCubeDescManager().reloadCubeDescLocal(cacheKey);
+                    break;
+                case PROJECT:
+                    ProjectInstance projectInstance = getProjectManager().reloadProjectLocal(cacheKey);
+                    removeOLAPDataSource(projectInstance.getName());
+                    break;
+                case INVERTED_INDEX:
+                    //II update does not need to update storage cache because it is dynamic already
+                    getIIManager().reloadIILocal(cacheKey);
+                    getHybridManager().reloadHybridInstanceByChild(RealizationType.INVERTED_INDEX, cacheKey);
+                    getProjectManager().clearL2Cache();
+                    break;
+                case INVERTED_INDEX_DESC:
+                    getIIDescManager().reloadIIDescLocal(cacheKey);
+                    break;
+                case TABLE:
+                    getMetadataManager().reloadTableCache(cacheKey);
+                    IIDescManager.clearCache();
+                    CubeDescManager.clearCache();
+                    break;
+                case DATA_MODEL:
+                    getMetadataManager().reloadDataModelDesc(cacheKey);
+                    IIDescManager.clearCache();
+                    CubeDescManager.clearCache();
+                    break;
+                case ALL:
+                    MetadataManager.clearCache();
+                    CubeDescManager.clearCache();
+                    CubeManager.clearCache();
+                    IIDescManager.clearCache();
+                    IIManager.clearCache();
+                    HybridManager.clearCache();
+                    RealizationRegistry.clearCache();
+                    ProjectManager.clearCache();
+                    KafkaConfigManager.clearCache();
+                    StreamingManager.clearCache();
+                    super.cleanAllDataCache();
+                    BasicService.removeAllOLAPDataSources();
+                    break;
+                default:
+                    throw new RuntimeException("invalid cacheType:" + cacheType);
             }
         } catch (IOException e) {
             throw new RuntimeException("error " + log, e);
@@ -133,29 +143,29 @@ public class CacheService extends BasicService {
         final String log = "remove cache type: " + cacheType + " name:" + cacheKey;
         try {
             switch (cacheType) {
-            case CUBE:
-                String storageUUID = getCubeManager().getCube(cacheKey).getUuid();
-                getCubeManager().removeCubeLocal(cacheKey);
-                super.cleanDataCache(storageUUID);
-                break;
-            case CUBE_DESC:
-                getCubeDescManager().removeLocalCubeDesc(cacheKey);
-                break;
-            case PROJECT:
-                ProjectManager.clearCache();
-                break;
-            case INVERTED_INDEX:
-                getIIManager().removeIILocal(cacheKey);
-                break;
-            case INVERTED_INDEX_DESC:
-                getIIDescManager().removeIIDescLocal(cacheKey);
-                break;
-            case TABLE:
-                throw new UnsupportedOperationException(log);
-            case DATA_MODEL:
-                throw new UnsupportedOperationException(log);
-            default:
-                throw new RuntimeException("invalid cacheType:" + cacheType);
+                case CUBE:
+                    String storageUUID = getCubeManager().getCube(cacheKey).getUuid();
+                    getCubeManager().removeCubeLocal(cacheKey);
+                    super.cleanDataCache(storageUUID);
+                    break;
+                case CUBE_DESC:
+                    getCubeDescManager().removeLocalCubeDesc(cacheKey);
+                    break;
+                case PROJECT:
+                    ProjectManager.clearCache();
+                    break;
+                case INVERTED_INDEX:
+                    getIIManager().removeIILocal(cacheKey);
+                    break;
+                case INVERTED_INDEX_DESC:
+                    getIIDescManager().removeIIDescLocal(cacheKey);
+                    break;
+                case TABLE:
+                    throw new UnsupportedOperationException(log);
+                case DATA_MODEL:
+                    throw new UnsupportedOperationException(log);
+                default:
+                    throw new RuntimeException("invalid cacheType:" + cacheType);
             }
         } catch (IOException e) {
             throw new RuntimeException("error " + log, e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java b/server/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
new file mode 100644
index 0000000..a0b19b2
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/service/KafkaConfigService.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.prepost.PostFilter;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component("kafkaMgmtService")
+public class KafkaConfigService extends BasicService {
+
+    @Autowired
+    private AccessService accessService;
+
+    @PostFilter(Constant.ACCESS_POST_FILTER_READ)
+    public List<KafkaConfig> listAllKafkaConfigs(final String kafkaConfigName) throws IOException {
+        List<KafkaConfig> kafkaConfigs = new ArrayList<KafkaConfig>();
+//        CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null;
+        if (null == kafkaConfigName) {
+            kafkaConfigs = getKafkaManager().listAllKafkaConfigs();
+        } else {
+            List<KafkaConfig> configs = getKafkaManager().listAllKafkaConfigs();
+            for(KafkaConfig config : configs){
+                if(kafkaConfigName.equals(config.getName())){
+                    kafkaConfigs.add(config);
+                }
+            }
+        }
+
+        return kafkaConfigs;
+    }
+
+    public List<KafkaConfig> getKafkaConfigs(final String kafkaConfigName, final Integer limit, final Integer offset) throws IOException {
+
+        List<KafkaConfig> kafkaConfigs;
+        kafkaConfigs = listAllKafkaConfigs(kafkaConfigName);
+
+        if (limit == null || offset == null) {
+            return kafkaConfigs;
+        }
+
+        if ((kafkaConfigs.size() - offset) < limit) {
+            return kafkaConfigs.subList(offset, kafkaConfigs.size());
+        }
+
+        return kafkaConfigs.subList(offset, offset + limit);
+    }
+
+
+
+    public KafkaConfig createKafkaConfig(KafkaConfig config) throws IOException {
+        if (getKafkaManager().getKafkaConfig(config.getName()) != null) {
+            throw new InternalErrorException("The kafkaConfig named " + config.getName() + " already exists");
+        }
+        getKafkaManager().createKafkaConfig(config.getName(),config);
+        return config;
+    }
+
+//    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+    public KafkaConfig updateKafkaConfig(KafkaConfig config) throws IOException {
+        return getKafkaManager().updateKafkaConfig(config);
+    }
+
+    public KafkaConfig getKafkaConfig(String configName) throws IOException {
+        return getKafkaManager().getKafkaConfig(configName);
+    }
+//    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+    public void dropKafkaConfig(KafkaConfig config) throws IOException {
+        getKafkaManager().removeKafkaConfig(config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
new file mode 100644
index 0000000..0f67ac8
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.prepost.PostFilter;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component("streamingMgmtService")
+public class StreamingService extends BasicService {
+
+    @Autowired
+    private AccessService accessService;
+
+    @PostFilter(Constant.ACCESS_POST_FILTER_READ)
+    public List<StreamingConfig> listAllStreamingConfigs(final String cubeName) throws IOException {
+        List<StreamingConfig> streamingConfigs = new ArrayList();
+        CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null;
+        if (null == cubeInstance) {
+            streamingConfigs = getSreamingManager().listAllStreaming();
+        } else {
+            for(StreamingConfig config : getSreamingManager().listAllStreaming()){
+                if(cubeInstance.getName().equals(config.getCubeName())){
+                    streamingConfigs.add(config);
+                }
+            }
+        }
+
+        return streamingConfigs;
+    }
+
+    public List<StreamingConfig> getStreamingConfigs(final String cubeName, final Integer limit, final Integer offset) throws IOException {
+
+        List<StreamingConfig> streamingConfigs;
+        streamingConfigs = listAllStreamingConfigs(cubeName);
+
+        if (limit == null || offset == null) {
+            return streamingConfigs;
+        }
+
+        if ((streamingConfigs.size() - offset) < limit) {
+            return streamingConfigs.subList(offset, streamingConfigs.size());
+        }
+
+        return streamingConfigs.subList(offset, offset + limit);
+    }
+
+    public StreamingConfig createStreamingConfig(StreamingConfig config) throws IOException {
+        if (getSreamingManager().getStreamingConfig(config.getName()) != null) {
+            throw new InternalErrorException("The streamingConfig named " + config.getName() + " already exists");
+        }
+        StreamingConfig streamingConfig =  getSreamingManager().saveStreamingConfig(config);
+        return streamingConfig;
+    }
+
+//    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+    public StreamingConfig updateStreamingConfig(StreamingConfig config) throws IOException {
+        return getSreamingManager().updateStreamingConfig(config);
+    }
+
+//    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+    public void dropStreamingConfig(StreamingConfig config) throws IOException {
+        getSreamingManager().removeStreamingConfig(config);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/server/src/main/resources/kylinSecurity.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index 518a9bd..ee3c891 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -19,6 +19,8 @@
         <scr:intercept-url pattern="/api/cache*/**" access="permitAll" />
 		<scr:intercept-url pattern="/api/cubes/src/tables" access="hasAnyRole('ROLE_ANALYST')" />
 		<scr:intercept-url pattern="/api/cubes*/**" access="isAuthenticated()" />
+		<scr:intercept-url pattern="/api/models*/**" access="isAuthenticated()" />
+		<scr:intercept-url pattern="/api/streaming*/**" access="isAuthenticated()" />
 		<scr:intercept-url pattern="/api/job*/**" access="isAuthenticated()" />
 		<scr:intercept-url pattern="/api/admin/config" access="permitAll" />
 		<scr:intercept-url pattern="/api/projects" access="permitAll" />

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index d8f9f50..3032d13 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -40,13 +40,21 @@ import com.fasterxml.jackson.databind.type.MapType;
 import com.fasterxml.jackson.databind.type.SimpleType;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -60,15 +68,25 @@ public class KafkaConfigManager {
 
     private KylinConfig config;
 
-    private KafkaConfigManager(KylinConfig config) {
+    // name ==> StreamingConfig
+    private CaseInsensitiveStringCache<KafkaConfig> kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(Broadcaster.TYPE.KAFKA);
+
+    public static final Serializer<KafkaConfig> KAFKA_SERIALIZER = new JsonSerializer<KafkaConfig>(KafkaConfig.class);
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    private KafkaConfigManager(KylinConfig config) throws IOException {
         this.config = config;
+        reloadAllKafkaConfig();
     }
 
     private ResourceStore getStore() {
         return ResourceStore.getStore(this.config);
     }
 
-    public static KafkaConfigManager getInstance(KylinConfig config) {
+    public static KafkaConfigManager getInstance(KylinConfig config){
         KafkaConfigManager r = CACHE.get(config);
         if (r != null) {
             return r;
@@ -79,15 +97,43 @@ public class KafkaConfigManager {
             if (r != null) {
                 return r;
             }
+            try{
             r = new KafkaConfigManager(config);
             CACHE.put(config, r);
             if (CACHE.size() > 1) {
-                logger.warn("More than one cubemanager singleton exist");
+                logger.warn("More than one KafkaConfigManager singleton exist");
             }
             return r;
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e);
+        }
         }
     }
 
+    public List<KafkaConfig> listAllKafkaConfigs() {
+        return new ArrayList(kafkaMap.values());
+    }
+
+    /**
+     * Reload KafkaConfig from resource store It will be triggered by an desc
+     * update event.
+     *
+     * @param name
+     * @throws IOException
+     */
+    public KafkaConfig reloadKafkaConfigLocal(String name) throws IOException {
+
+        // Save Source
+        String path = KafkaConfig.getKafkaResourcePath(name);
+
+        // Reload the KafkaConfig
+        KafkaConfig ndesc = loadKafkaConfigAt(path);
+
+        // Here replace the old one
+        kafkaMap.putLocal(ndesc.getName(), ndesc);
+        return ndesc;
+    }
+
     private boolean checkExistence(String name) {
         return true;
     }
@@ -96,9 +142,17 @@ public class KafkaConfigManager {
         return ResourceStore.KAfKA_RESOURCE_ROOT + "/" + name + ".json";
     }
 
-    public boolean createOrUpdateKafkaConfig(String name, KafkaConfig config) {
+    public boolean createKafkaConfig(String name, KafkaConfig config) {
+
+        if (config == null || StringUtils.isEmpty(config.getName())) {
+            throw new IllegalArgumentException();
+        }
+
+        if (kafkaMap.containsKey(config.getName()))
+            throw new IllegalArgumentException("KafkaConfig '" + config.getName() + "' already exists");
         try {
             getStore().putResource(formatStreamingConfigPath(name), config, KafkaConfig.SERIALIZER);
+            kafkaMap.put(config.getName(), config);
             return true;
         } catch (IOException e) {
             logger.error("error save resource name:" + name, e);
@@ -106,13 +160,41 @@ public class KafkaConfigManager {
         }
     }
 
-    public KafkaConfig getKafkaConfig(String name) {
-        try {
-            return getStore().getResource(formatStreamingConfigPath(name), KafkaConfig.class, KafkaConfig.SERIALIZER);
-        } catch (IOException e) {
-            logger.error("error get resource name:" + name, e);
-            throw new RuntimeException("error get resource name:" + name, e);
+    public KafkaConfig updateKafkaConfig(KafkaConfig desc) throws IOException {
+        // Validate KafkaConfig
+        if (desc.getUuid() == null || desc.getName() == null) {
+            throw new IllegalArgumentException();
+        }
+        String name = desc.getName();
+        if (!kafkaMap.containsKey(name)) {
+            throw new IllegalArgumentException("KafkaConfig '" + name + "' does not exist.");
+        }
+
+        // Save Source
+        String path = desc.getResourcePath();
+        getStore().putResource(path, desc, KAFKA_SERIALIZER);
+
+        // Reload the KafkaConfig
+        KafkaConfig ndesc = loadKafkaConfigAt(path);
+        // Here replace the old one
+        kafkaMap.put(ndesc.getName(), desc);
+
+        return ndesc;
+    }
+
+    private KafkaConfig loadKafkaConfigAt(String path) throws IOException {
+        ResourceStore store = getStore();
+        KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class,KAFKA_SERIALIZER );
+
+        if (StringUtils.isBlank(kafkaConfig.getName())) {
+            throw new IllegalStateException("KafkaConfig name must not be blank");
         }
+        return kafkaConfig;
+    }
+
+
+    public KafkaConfig getKafkaConfig(String name) {
+        return kafkaMap.get(name);
     }
 
     public void saveKafkaConfig(KafkaConfig kafkaConfig) throws IOException {
@@ -124,6 +206,44 @@ public class KafkaConfigManager {
         getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER);
     }
 
+    // remove kafkaConfig
+    public void removeKafkaConfig(KafkaConfig kafkaConfig) throws IOException {
+        String path = kafkaConfig.getResourcePath();
+        getStore().deleteResource(path);
+        kafkaMap.remove(kafkaConfig.getName());
+    }
+
+
+    private void reloadAllKafkaConfig() throws IOException {
+        ResourceStore store = getStore();
+        logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAfKA_RESOURCE_ROOT));
+
+        kafkaMap.clear();
+
+        List<String> paths = store.collectResourceRecursively(ResourceStore.KAfKA_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
+        for (String path : paths) {
+            KafkaConfig kafkaConfig;
+            try {
+                kafkaConfig = loadKafkaConfigAt(path);
+            } catch (Exception e) {
+                logger.error("Error loading kafkaConfig desc " + path, e);
+                continue;
+            }
+            if (path.equals(kafkaConfig.getResourcePath()) == false) {
+                logger.error("Skip suspicious desc at " + path + ", " + kafkaConfig + " should be at " + kafkaConfig.getResourcePath());
+                continue;
+            }
+            if (kafkaMap.containsKey(kafkaConfig.getName())) {
+                logger.error("Dup KafkaConfig name '" + kafkaConfig.getName() + "' on path " + path);
+                continue;
+            }
+
+            kafkaMap.putLocal(kafkaConfig.getName(), kafkaConfig);
+        }
+
+        logger.debug("Loaded " + kafkaMap.size() + " KafkaConfig(s)");
+    }
+
     private final ObjectMapper mapper = new ObjectMapper();
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 1aff0ce..90df9f5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -42,12 +42,14 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.persistence.Serializer;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonManagedReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.metadata.MetadataConstants;
 
 /**
  */
@@ -84,6 +86,13 @@ public class KafkaConfig extends RootPersistentEntity {
     //"configA=1;configB=2"
     @JsonProperty("parserProperties")
     private String parserProperties;
+    public String getResourcePath() {
+        return getKafkaResourcePath(name);
+    }
+
+    public static String getKafkaResourcePath(String streamingName) {
+        return ResourceStore.KAfKA_RESOURCE_ROOT + "/" + streamingName + MetadataConstants.FILE_SURFIX;
+    }
 
     public List<KafkaClusterConfig> getKafkaClusterConfigs() {
         return kafkaClusterConfigs;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/index.html
----------------------------------------------------------------------
diff --git a/webapp/app/index.html b/webapp/app/index.html
index d2b7e68..caa8d2a 100644
--- a/webapp/app/index.html
+++ b/webapp/app/index.html
@@ -45,6 +45,7 @@
     <link rel="stylesheet" type="text/css" href="css/AdminLTE.css">
     <link rel="stylesheet" type="text/css" href="components/bootstrap-sweetalert/lib/sweet-alert.css">
     <link rel="stylesheet" type="text/css" href="components/angular-bootstrap-nav-tree/dist/abn_tree.css">
+    <link rel="stylesheet" type="text/css" href="components/angular-toggle-switch/angular-toggle-switch.css">
 
     <link rel="stylesheet/less" href="less/build.less">
     <!-- endref -->
@@ -102,6 +103,7 @@
 <script src="components/angular-underscore/angular-underscore.js"></script>
 <script src="components/jquery-ui/jquery-ui.min.js"></script>
 <script src="components/angular-ui-sortable/sortable.js"></script>
+<script src="components/angular-toggle-switch/angular-toggle-switch.js"></script>
 
 <script src="js/app.js"></script>
 <script src="js/config.js"></script>
@@ -120,6 +122,7 @@
 <script src="js/services/model.js"></script>
 
 <script src="js/services/cubes.js"></script>
+<script src="js/services/streaming.js"></script>
 <script src="js/services/graph.js"></script>
 <script src="js/services/jobs.js"></script>
 <script src="js/services/message.js"></script>
@@ -136,6 +139,8 @@
 <script src="js/model/jobConfig.js"></script>
 <script src="js/model/projectConfig.js"></script>
 <script src="js/model/tableConfig.js"></script>
+<script src="js/model/streamingModel.js"></script>
+<script src="js/model/streamingListModel.js"></script>
 <!--New GUI-->
 <script src="js/model/modelConfig.js"></script>
 
@@ -157,6 +162,7 @@
 <script src="js/controllers/job.js"></script>
 <script src="js/controllers/cube.js"></script>
 <script src="js/controllers/cubes.js"></script>
+<script src="js/controllers/streaming.js"></script>
 <script src="js/controllers/projects.js"></script>
 <script src="js/controllers/cubeEdit.js"></script>
 <script src="js/controllers/cubeSchema.js"></script>
@@ -166,15 +172,17 @@
 <script src="js/controllers/projectMeta.js"></script>
 <script src="js/controllers/cubeModel.js"></script>
 <script src="js/controllers/cubeDimensions.js"></script>
-<script src="js/controllers/cubeFilter.js"></script>
 <script src="js/controllers/cubeRefresh.js"></script>
 <script src="js/controllers/cubeAdvanceSetting.js"></script>
+<script src="js/controllers/cubeMeasures.js"></script>
 <!--New GUI-->
 <script src="js/controllers/modelSchema.js"></script>
 <script src="js/controllers/modelDimensions.js"></script>
 <script src="js/controllers/modelRefresh.js"></script>
 <script src="js/controllers/modelEdit.js"></script>
 
+<script src="js/controllers/streamingConfig.js"></script>
+
 <!--New GUI-->
 <script src="js/controllers/models.js"></script>
 
@@ -222,5 +230,27 @@
     </div>
 </script>
 
+<!-- static template for cube save/update result notification -->
+<script type="text/ng-template" id="streamingResultError.html">
+  <div class="callout">
+    <h4>Error Message</h4>
+    <p>{{text}}</p>
+  </div>
+  <div class="callout callout-danger">
+    <h4>Streaming Schema</h4>
+    <pre>{{streamingSchema}}</pre>
+  </div>
+  <div class="callout callout-danger">
+    <h4>Kafka Schema</h4>
+    <pre>{{kfkSchema}}</pre>
+  </div>
+</script>
+
+<script type="text/ng-template" id="streamingResultSuccess.html">
+  <div class="callout callout-info">
+    <p>{{text}}</p>
+  </div>
+
+
 </body>
 </html>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/app.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/app.js b/webapp/app/js/app.js
index 1e00bdc..dc9d087 100644
--- a/webapp/app/js/app.js
+++ b/webapp/app/js/app.js
@@ -17,4 +17,4 @@
 */
 
 //Kylin Application Module
-KylinApp = angular.module('kylin', ['ngRoute', 'ngResource', 'ngGrid', 'ui.bootstrap', 'ui.ace', 'base64', 'angularLocalStorage', 'localytics.directives', 'treeControl', 'nvd3ChartDirectives','ngLoadingRequest','oitozero.ngSweetAlert','ngCookies','angular-underscore', 'ngAnimate', 'ui.sortable','angularBootstrapNavTree']);
+KylinApp = angular.module('kylin', ['ngRoute', 'ngResource', 'ngGrid', 'ui.bootstrap', 'ui.ace', 'base64', 'angularLocalStorage', 'localytics.directives', 'treeControl', 'nvd3ChartDirectives','ngLoadingRequest','oitozero.ngSweetAlert','ngCookies','angular-underscore', 'ngAnimate', 'ui.sortable','angularBootstrapNavTree','toggle-switch']);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/53b383d9/webapp/app/js/controllers/cubeAdvanceSetting.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/cubeAdvanceSetting.js b/webapp/app/js/controllers/cubeAdvanceSetting.js
index fb9c019..fc02c6e 100644
--- a/webapp/app/js/controllers/cubeAdvanceSetting.js
+++ b/webapp/app/js/controllers/cubeAdvanceSetting.js
@@ -98,4 +98,32 @@ KylinApp.controller('CubeAdvanceSettingCtrl', function ($scope, $modal,cubeConfi
     })
   }
 
+
+  $scope.addNewRowkeyColumn = function () {
+    $scope.cubeMetaFrame.rowkey.rowkey_columns.push({
+      "column": "",
+      "length": 0,
+      "dictionary": "true",
+      "mandatory": false
+    });
+  };
+
+  $scope.addNewAggregationGroup = function () {
+    $scope.cubeMetaFrame.rowkey.aggregation_groups.push([]);
+  };
+
+  $scope.refreshAggregationGroup = function (list, index, aggregation_groups) {
+    if (aggregation_groups) {
+      list[index] = aggregation_groups;
+    }
+  };
+
+  $scope.removeElement = function (arr, element) {
+    var index = arr.indexOf(element);
+    if (index > -1) {
+      arr.splice(index, 1);
+    }
+  };
+
+
 });