You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/09/24 06:16:49 UTC

[1/3] incubator-kylin git commit: KYLIN-1042, [Data Source]support create Table Schema from Streaming Source Record

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-1011 6d862747d -> 22ef792cf (forced update)


KYLIN-1042, [Data Source]support create Table Schema from Streaming Source Record


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1ea58708
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1ea58708
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1ea58708

Branch: refs/heads/KYLIN-1011
Commit: 1ea587081743fb5eec00760a494edce0a606b39b
Parents: cc15d63
Author: jiazhong <ji...@ebay.com>
Authored: Fri Sep 18 18:35:30 2015 +0800
Committer: jiazhong <ji...@ebay.com>
Committed: Wed Sep 23 16:37:11 2015 +0800

----------------------------------------------------------------------
 .../kylin/rest/controller/TableController.java  |  27 +-
 .../kylin/rest/request/StreamingRequest.java    |  43 ++
 webapp/app/js/controllers/sourceMeta.js         | 209 ++++++++-
 webapp/app/js/model/tableConfig.js              |   5 +-
 webapp/app/js/services/tables.js                |   1 +
 webapp/app/less/app.less                        |   5 +
 webapp/app/less/component.less                  |   4 +
 webapp/app/partials/cubeDesigner/info.html      |   4 +-
 .../app/partials/modelDesigner/data_model.html  |   4 +-
 .../app/partials/modelDesigner/model_info.html  |   4 +-
 webapp/app/partials/models/models_tree.html     |   5 +
 .../app/partials/tables/source_table_tree.html  |   2 +-
 webapp/app/partials/tables/table_detail.html    | 439 ++++++++++++-------
 13 files changed, 570 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/server/src/main/java/org/apache/kylin/rest/controller/TableController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 133b5ad..39af7db 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -19,19 +19,18 @@
 package org.apache.kylin.rest.controller;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.CardinalityRequest;
+import org.apache.kylin.rest.request.StreamingRequest;
 import org.apache.kylin.rest.response.TableDescResponse;
 import org.apache.kylin.rest.service.CubeService;
 import org.slf4j.Logger;
@@ -129,6 +128,22 @@ public class TableController extends BasicController {
         return result;
     }
 
+
+    @RequestMapping(value = "/addStreamingSrc", method = { RequestMethod.POST })
+    @ResponseBody
+    public Map<String, String> addStreamingTable(@RequestBody StreamingRequest request) throws IOException {
+        Map<String, String> result = new HashMap<String, String>();
+        String project = request.getProject();
+        TableDesc desc = JsonUtil.readValue(request.getTableData(),TableDesc.class);
+        desc.setUuid(UUID.randomUUID().toString());
+        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        metaMgr.saveSourceTable(desc);
+        cubeMgmtService.syncTableToProject(new String[]{desc.getName()},project);
+        result.put("success","true");
+        return result;
+    }
+
+
     /**
      * Regenerate table cardinality
      *

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
new file mode 100644
index 0000000..3bcf9d7
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingRequest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kylin.rest.request;
+
+import java.lang.String;public class StreamingRequest {
+
+    private String project;
+
+    private String tableData;
+
+    public String getProject() {
+        return project;
+    }
+
+    public void setProject(String project) {
+        this.project = project;
+    }
+
+    public String getTableData() {
+        return tableData;
+    }
+
+    public void setTableData(String tableData) {
+        this.tableData = tableData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/webapp/app/js/controllers/sourceMeta.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index 942c079..9d405dc 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -152,12 +152,211 @@ KylinApp
         })
       }
     };
-    $scope.trimType = function (typeName) {
-      if (typeName.match(/VARCHAR/i)) {
-        typeName = "VARCHAR";
+
+
+    //streaming model
+    $scope.openStreamingSourceModal = function () {
+      $modal.open({
+        templateUrl: 'addStreamingSource.html',
+        controller: StreamingSourceCtrl,
+        resolve: {
+          tableNames: function () {
+            return $scope.tableNames;
+          },
+          projectName: function () {
+            return $scope.projectModel.selectedProject;
+          },
+          scope: function () {
+            return $scope;
+          }
+        }
+      });
+    };
+
+    var StreamingSourceCtrl = function ($scope, $location, $modalInstance, tableNames, MessageService, projectName, scope, tableConfig) {
+      $scope.streamingPrefix = "STREAMING_";
+      $scope.projectName = projectName;
+      $scope.tableConfig = tableConfig;
+      $scope.streaming = {
+        sourceSchema: '',
+        'parseResult': {}
       }
 
-      return typeName.trim().toLowerCase();
-    }
+      $scope.table = {
+        name: '',
+        sourceValid:false
+      }
+
+      $scope.cancel = function () {
+        $modalInstance.dismiss('cancel');
+      };
+
+      $scope.streamingOnLoad = function () {
+        console.log($scope.streaming.sourceSchema);
+      }
+
+      $scope.columnList = [];
+
+      $scope.streamingOnChange = function () {
+        console.log($scope.streaming.sourceSchema);
+        try {
+          $scope.streaming.parseResult = JSON.parse($scope.streaming.sourceSchema);
+        } catch (error) {
+          $scope.table.sourceValid = false;
+          console.log(error);
+          return;
+        }
+        $scope.table.sourceValid = true;
+        var columnList = [];
+        for (var key in $scope.streaming.parseResult) {
+          var defaultType="varchar(256)";
+          var _value = $scope.streaming.parseResult[key];
+          var defaultChecked = "Y";
+          if(typeof _value ==="string"){
+            defaultType="varchar(256)";
+          }else if(typeof _value ==="number"){
+            if(_value <= 2147483647){
+              if(_value.toString().indexOf(".")!=-1){
+                defaultType="decimal";
+              }else{
+                defaultType="int";
+              }
+            }else{
+              defaultType="timestamp";
+            }
+          }
+          if(defaultType=="timestamp"){
+            defaultChecked = "N";
+          }
+          columnList.push({
+            'name': key,
+            'checked': defaultChecked,
+            'type': defaultType,
+            'fromSource':'Y'
+          });
+
+
+
+          //var formatList = [];
+          //var
+          columnList = _.sortBy(columnList, function (i) { return i.type; });
+        }
+
+          var timeMeasure = ['year_start','month_start','day_start','hour_start','min_start'];
+          for(var i = 0;i<timeMeasure.length;i++){
+            var defaultCheck = 'Y';
+            if(timeMeasure[i]=='min_start'){
+              defaultCheck = 'N';
+            }
+            columnList.push({
+              'name': timeMeasure[i],
+              'checked': defaultCheck,
+              'type': 'timestamp',
+              'fromSource':'N'
+            });
+          }
+
+        if($scope.columnList.length==0){
+          $scope.columnList = columnList;
+        }
+
+        angular.forEach(columnList,function(item){
+          var included = false;
+          for(var i=0;i<$scope.columnList.length;i++){
+            if($scope.columnList[i].name==item.name){
+              included = true;
+              break;
+            }
+          }
+          if(!included){
+            $scope.columnList.push(item);
+          }
+        })
+
+      }
+
+      $scope.form={};
+      $scope.rule={
+        'timestampColumnConflict':false
+      }
+      $scope.syncStreamingSchema = function () {
+        $scope.form['setStreamingSchema'].$sbumitted = true;
+        if(!$scope.streaming.sourceSchema||$scope.streaming.sourceSchema===""){
+          return;
+        }
+
+        if(!$scope.table.name||$scope.table.name===""){
+          return;
+        }
+
+        var timestampCount = 0;
+        angular.forEach($scope.columnList,function(item){
+          if(item.checked == "Y"&&item.type=="timestamp"&&item.fromSource=='Y'){
+            timestampCount++;
+          }
+        })
+
+        if(timestampCount!=1){
+          $scope.rule.timestampColumnConflict = true;
+          return;
+        }
+
+        var columns = [];
+        angular.forEach($scope.columnList,function(column,$index){
+          if (column.checked == "Y") {
+            var columnInstance = {
+              "id": ++$index,
+              "name": column.name,
+              "datatype": column.type
+            }
+            columns.push(columnInstance);
+          }
+        })
+
+
+        $scope.tableData = {
+          "name": $scope.streamingPrefix+$scope.table.name,
+          "columns": columns,
+          'database':'Default'
+        }
+
+        SweetAlert.swal({
+          title: '',
+          text: 'Are you sure to create the streaming table info?',
+          type: '',
+          showCancelButton: true,
+          confirmButtonColor: '#DD6B55',
+          confirmButtonText: "Yes",
+          closeOnConfirm: true
+        }, function (isConfirm) {
+          if (isConfirm) {
+            loadingRequest.show();
+            TableService.addStreamingSrc({}, {project: $scope.projectName,tableData:angular.toJson($scope.tableData)}, function (request) {
+              if(request.success){
+                loadingRequest.hide();
+                SweetAlert.swal('', 'Create Streaming Table Schema Successfully.', 'success');
+                $scope.cancel();
+                scope.aceSrcTbLoaded(true);
+                return;
+              }else{
+                SweetAlert.swal('Oops...', "Failed to take action.", 'error');
+              }
+              //end loading
+              loadingRequest.hide();
+            }, function (e) {
+              if (e.data && e.data.exception) {
+                var message = e.data.exception;
+                var msg = !!(message) ? message : 'Failed to take action.';
+                SweetAlert.swal('Oops...', msg, 'error');
+              } else {
+                SweetAlert.swal('Oops...', "Failed to take action.", 'error');
+              }
+              loadingRequest.hide();
+            });
+          }
+        });
+      }
+    };
+
   });
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/webapp/app/js/model/tableConfig.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/tableConfig.js b/webapp/app/js/model/tableConfig.js
index f994b09..3dd7b25 100644
--- a/webapp/app/js/model/tableConfig.js
+++ b/webapp/app/js/model/tableConfig.js
@@ -22,6 +22,9 @@ KylinApp.constant('tableConfig', {
     {attr: 'name', name: 'Name'},
     {attr: 'datatype', name: 'Data Type'},
     {attr: 'cardinality', name: 'Cardinality'}
-  ]
+  ],
+  dataTypes:["tinyint","smallint","int","bigint","float","double","decimal","timestamp","date","string","varchar(256)","char","boolean","binary"]
+
+
 
 });

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/webapp/app/js/services/tables.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/services/tables.js b/webapp/app/js/services/tables.js
index 1f1f15a..3b5e9f4 100755
--- a/webapp/app/js/services/tables.js
+++ b/webapp/app/js/services/tables.js
@@ -23,6 +23,7 @@ KylinApp.factory('TableService', ['$resource', function ($resource, config) {
     getExd: {method: 'GET', params: {action: 'exd-map'}, isArray: false},
     reload: {method: 'PUT', params: {action: 'reload'}, isArray: false},
     loadHiveTable: {method: 'POST', params: {}, isArray: false},
+    addStreamingSrc: {method: 'POST', params: {action:'addStreamingSrc'}, isArray: false},
     genCardinality: {method: 'PUT', params: {action: 'cardinality'}, isArray: false}
   });
 }]);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/webapp/app/less/app.less
----------------------------------------------------------------------
diff --git a/webapp/app/less/app.less b/webapp/app/less/app.less
index a7651fe..fc80698 100644
--- a/webapp/app/less/app.less
+++ b/webapp/app/less/app.less
@@ -661,3 +661,8 @@ ul.messenger .messenger-message-inner,.ngCellText {
   -o-transition: width 0.5s;;
   transition: width 0.5s;;
 }
+
+.form-group.required .control-label:after {
+  content:"*";
+  color:red;
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/webapp/app/less/component.less
----------------------------------------------------------------------
diff --git a/webapp/app/less/component.less b/webapp/app/less/component.less
index a160752..929006f 100644
--- a/webapp/app/less/component.less
+++ b/webapp/app/less/component.less
@@ -996,3 +996,7 @@ ul.abn-tree li.abn-tree-row a{
 .sweet-alert .lead.text-muted{
   word-break:break-all;
 }
+
+.modal-body.streaming-source .ace_editor{
+  height: 600px !important;
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/webapp/app/partials/cubeDesigner/info.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/cubeDesigner/info.html b/webapp/app/partials/cubeDesigner/info.html
index 1684516..f680487 100644
--- a/webapp/app/partials/cubeDesigner/info.html
+++ b/webapp/app/partials/cubeDesigner/info.html
@@ -20,7 +20,7 @@
     <div class="col-xs-8">
         <ng-form name="forms.cube_info_form" novalidate>
             <!--Project-->
-            <div class="form-group">
+            <div class="form-group required">
                 <div class="row">
                   <label class="col-xs-12 col-sm-3 control-label no-padding-right">
                     <b>Model Name</b>
@@ -40,7 +40,7 @@
             </div>
 
             <!--Cube Name-->
-            <div class="form-group">
+            <div class="form-group required">
                 <div class="row">
                     <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default">
                         <b>Cube Name</b>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/webapp/app/partials/modelDesigner/data_model.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/modelDesigner/data_model.html b/webapp/app/partials/modelDesigner/data_model.html
index 60cbacb..e100287 100644
--- a/webapp/app/partials/modelDesigner/data_model.html
+++ b/webapp/app/partials/modelDesigner/data_model.html
@@ -20,9 +20,9 @@
     <ng-form name="forms.data_model_form">
 
     <!-- Fact Table Name -->
-    <div class="form-group">
+    <div class="form-group required">
         <div class="row">
-            <label class="col-xs-12 col-sm-2 concube.detailtrol-label no-padding-right font-color-default">
+            <label class="col-xs-12 col-sm-2 control-label concube.detailtrol-label no-padding-right font-color-default">
                 <b>Fact Table</b>
             </label>
             <div class="col-xs-12 col-sm-6" ng-class="{'has-error':forms.data_model_form.innerform.typeahead.$invalid && (forms.data_model_form.innerform.typeahead.$dirty||forms.data_model_form.$sbumitted)}">

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/webapp/app/partials/modelDesigner/model_info.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/modelDesigner/model_info.html b/webapp/app/partials/modelDesigner/model_info.html
index 18fac09..aed4325 100644
--- a/webapp/app/partials/modelDesigner/model_info.html
+++ b/webapp/app/partials/modelDesigner/model_info.html
@@ -22,7 +22,7 @@
         <ng-form name="forms.model_info_form" novalidate>
 
             <!--Model Name-->
-            <div class="form-group">
+            <div class="form-group required">
                 <div class="row">
                     <label class="col-xs-12 col-sm-3 control-label no-padding-right font-color-default">
                         <b>Model Name</b>
@@ -96,4 +96,4 @@
         </div><!-- /.box-body -->
     </div>
     </div>
-</div>
\ No newline at end of file
+</div>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/webapp/app/partials/models/models_tree.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/models/models_tree.html b/webapp/app/partials/models/models_tree.html
index b04d481..722d65f 100644
--- a/webapp/app/partials/models/models_tree.html
+++ b/webapp/app/partials/models/models_tree.html
@@ -43,6 +43,11 @@
           <li ng-if="userService.hasRole('ROLE_ADMIN')">
             <a href="models/add"  ng-if="userService.hasRole('ROLE_MODELER')"><i class="fa fa-star"></i> New Model</a>
           </li>
+
+          <li ng-if="userService.hasRole('ROLE_ADMIN')">
+            <a href="streaming/add"  ng-if="userService.hasRole('ROLE_MODELER')"><i class="fa fa-area-chart"></i>New Streaming</a>
+          </li>
+
         </ul>
       </div>
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/webapp/app/partials/tables/source_table_tree.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/tables/source_table_tree.html b/webapp/app/partials/tables/source_table_tree.html
index 2e5de36..ff95d10 100755
--- a/webapp/app/partials/tables/source_table_tree.html
+++ b/webapp/app/partials/tables/source_table_tree.html
@@ -26,7 +26,7 @@
         <div class="col-xs-5" style="padding-left: 0px;margin-top: 20px;">
             <div class="pull-right">
                 <a class="btn btn-xs btn-primary" tooltip="Load Hive Table"  ng-if="userService.hasRole('ROLE_ADMIN')"  ng-click="openModal()"><i class="fa fa-download"></i></a>
-                <a class="btn btn-xs btn-success" tooltip="Refresh Tables" ng-click="aceSrcTbChanged()"><i class="fa fa-refresh"></i></a>
+                <a class="btn btn-xs btn-primary" tooltip="Add Streaming Table"  ng-if="userService.hasRole('ROLE_ADMIN')"  ng-click="openStreamingSourceModal()"><i class="fa fa-area-chart"></i></a>
             </div>
         </div>
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ea58708/webapp/app/partials/tables/table_detail.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/tables/table_detail.html b/webapp/app/partials/tables/table_detail.html
index dc1492d..e692269 100644
--- a/webapp/app/partials/tables/table_detail.html
+++ b/webapp/app/partials/tables/table_detail.html
@@ -16,187 +16,300 @@
 * limitations under the License.
 -->
 
-<div  ng-controller="SourceMetaCtrl" class="nav-tabs-custom">
-    <div class="col-xs-12" ng-show="tableModel.selectedSrcDb&&tableModel.selectedSrcTable.name">
-        <h3 class="text-info">Table Schema:{{ tableModel.selectedSrcTable.name}}</h3>
-        <div class="tabbable nav-tabs-custom">
-            <ul class="nav nav-tabs">
-                <li class="active">
-                    <a data-toggle="tab" href="#column">Columns</a>
-                </li>
-                <li>
-                    <a data-toggle="tab" href="#schema">Extend Information</a>
-                </li>
-            </ul>
-            <div class="tab-content">
-                <!--Schema-->
-                <div id="schema" class="tab-pane">
-                    <div ng-if="tableModel.selectedSrcTable.uuid" class="table-responsive">
-                        <table class="table">
-                            <tbody>
-                            <tr>
-                                <th style="width:20%">NAME</th>
-                                <td>{{ tableModel.selectedSrcTable.name}}</td>
-                            </tr>
-                            <tr>
-                                <th>Hive DATABASE</th>
-                                <td>{{tableModel.selectedSrcTable.database}}</td>
-                            </tr>
-                            <tr>
-                                <th>SNAPSHOT TIME</th>
-                                <td>{{tableModel.selectedSrcTable.exd.lastUpdateTime | utcToConfigTimeZone}}</td>
-                            </tr>
-                            <tr>
-                                <th>LOCATION</th>
-                                <td>{{tableModel.selectedSrcTable.exd.location}}</td>
-                            </tr>
-                            <tr>
-                                <th>INPUT FORMAT</th>
-                                <td>{{tableModel.selectedSrcTable.exd.inputformat}}</td>
-                            </tr>
-                            <tr>
-                                <th>OUTPUT FORMAT</th>
-                                <td>{{tableModel.selectedSrcTable.exd.outputformat}}</td>
-                            </tr>
-                            <tr>
-                                <th>OWNER</th>
-                                <td><a href="mailto:{{tableModel.selectedSrcTable.exd.owner}}">{{tableModel.selectedSrcTable.exd.owner}}</a></td>
-                            </tr>
-                            <tr>
-                                <th>TOTAL FILE NUMBER</th>
-                                <td>{{tableModel.selectedSrcTable.exd.totalNumberFiles}}</td>
-                            </tr>
-                            <tr>
-                                <th>TOTAL FILE SIZE</th>
-                                <td>{{tableModel.selectedSrcTable.exd.totalFileSize}}</td>
-                            </tr>
-                            <tr>
-                                <th>PARTITIONED</th>
-                                <td>{{tableModel.selectedSrcTable.exd.partitioned}}</td>
-                            </tr>
-                            <tr>
-                                <th>PARTITION COLUMNS</th>
-                                <td>{{tableModel.selectedSrcTable.exd.partitionColumns}}</td>
-                            </tr>
-                            </tbody>
-                        </table>
-                    </div>
-                </div>
-                <!--Columns-->
-                <div id="column" class="tab-pane active">
-                    <div class="profile-user-info">
-                        <div>
-                            <label class="table-header-text">Columns</label>
+<div ng-controller="SourceMetaCtrl" class="nav-tabs-custom">
+  <div class="col-xs-12" ng-show="tableModel.selectedSrcDb&&tableModel.selectedSrcTable.name">
+    <h3 class="text-info">Table Schema:{{ tableModel.selectedSrcTable.name}}</h3>
+
+    <div class="tabbable nav-tabs-custom">
+      <ul class="nav nav-tabs">
+        <li class="active">
+          <a data-toggle="tab" href="#column">Columns</a>
+        </li>
+        <li>
+          <a data-toggle="tab" href="#schema">Extend Information</a>
+        </li>
+      </ul>
+      <div class="tab-content">
+        <!--Schema-->
+        <div id="schema" class="tab-pane">
+          <div ng-if="tableModel.selectedSrcTable.uuid" class="table-responsive">
+            <table class="table">
+              <tbody>
+              <tr>
+                <th style="width:20%">NAME</th>
+                <td>{{ tableModel.selectedSrcTable.name}}</td>
+              </tr>
+              <tr>
+                <th>Hive DATABASE</th>
+                <td>{{tableModel.selectedSrcTable.database}}</td>
+              </tr>
+              <tr>
+                <th>SNAPSHOT TIME</th>
+                <td>{{tableModel.selectedSrcTable.exd.lastUpdateTime | utcToConfigTimeZone}}</td>
+              </tr>
+              <tr>
+                <th>LOCATION</th>
+                <td>{{tableModel.selectedSrcTable.exd.location}}</td>
+              </tr>
+              <tr>
+                <th>INPUT FORMAT</th>
+                <td>{{tableModel.selectedSrcTable.exd.inputformat}}</td>
+              </tr>
+              <tr>
+                <th>OUTPUT FORMAT</th>
+                <td>{{tableModel.selectedSrcTable.exd.outputformat}}</td>
+              </tr>
+              <tr>
+                <th>OWNER</th>
+                <td><a
+                  href="mailto:{{tableModel.selectedSrcTable.exd.owner}}">{{tableModel.selectedSrcTable.exd.owner}}</a>
+                </td>
+              </tr>
+              <tr>
+                <th>TOTAL FILE NUMBER</th>
+                <td>{{tableModel.selectedSrcTable.exd.totalNumberFiles}}</td>
+              </tr>
+              <tr>
+                <th>TOTAL FILE SIZE</th>
+                <td>{{tableModel.selectedSrcTable.exd.totalFileSize}}</td>
+              </tr>
+              <tr>
+                <th>PARTITIONED</th>
+                <td>{{tableModel.selectedSrcTable.exd.partitioned}}</td>
+              </tr>
+              <tr>
+                <th>PARTITION COLUMNS</th>
+                <td>{{tableModel.selectedSrcTable.exd.partitionColumns}}</td>
+              </tr>
+              </tbody>
+            </table>
+          </div>
+        </div>
+        <!--Columns-->
+        <div id="column" class="tab-pane active">
+          <div class="profile-user-info">
+            <div>
+              <label class="table-header-text">Columns</label>
                             <span class="input-icon form-search nav-search pull-right">
-                                <input type="text" placeholder="Filter ..." class="nav-search-input" ng-model="columnName"/>
+                                <input type="text" placeholder="Filter ..." class="nav-search-input"
+                                       ng-model="columnName"/>
                                 <i class="ace-icon fa fa-search nav-search-icon"></i>
                             </span>
-                        </div>
-                        <div class="space-6"></div>
-                        <div ng-if="(tableModel.selectedSrcTable.columns | filter: columnName).length>0">
-                            <table class="table table-hover table-striped list">
-                                <thead>
-                                <tr style="cursor: pointer">
-                                    <th ng-repeat="theaditem in tableConfig.theaditems"
-                                        ng-click="state.filterAttr= theaditem.attr;state.reverseColumn=theaditem.attr;state.filterReverse=!state.filterReverse;">
-                                        {{theaditem.name}}
-                                        <i ng-if="state.reverseColumn!= theaditem.attr"
-                                           class="fa fa-unsorted"></i>
-                                        <i ng-if="state.reverseColumn== theaditem.attr && !state.filterReverse"
-                                           class="fa fa-sort-asc"></i>
-                                        <i ng-if="state.reverseColumn== theaditem.attr && state.filterReverse"
-                                           class="fa fa-sort-desc"></i>
-                                    </th>
-                                </tr>
-                                </thead>
-
-                                <tr ng-repeat="column in tableModel.selectedSrcTable.columns | filter: columnName | orderObjectBy:state.filterAttr:state.filterReverse">
-                                    <td style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
-                                        {{ column.id}}
-                                    </td>
-                                    <td style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
-                                        {{ column.name}}
-                                    </td>
-                                    <td style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
-                                        {{ column.datatype}}
-                                    </td>
-                                    <td style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
-                                        <!--{{ tableModel.selectedSrcTable.cardinality[column.name]}}-->
-                                        {{column.cardinality}}
-                                    </td>
-                                </tr>
-                            </table>
-                        </div>
-                        <div ng-if="(tableModel.selectedSrcTable.columns | filter: columnName).length == 0" no-result
-                             text="No Matched Table Column."></div>
-                        <div ng-if="!!!tableModel.selectedSrcTable.uuid">
-                            <div no-result text="No Table Selected."></div>
-                        </div>
-                    </div>
-                </div>
             </div>
+            <div class="space-6"></div>
+            <div ng-if="(tableModel.selectedSrcTable.columns | filter: columnName).length>0">
+              <table class="table table-hover table-striped list">
+                <thead>
+                <tr style="cursor: pointer">
+                  <th ng-repeat="theaditem in tableConfig.theaditems"
+                      ng-click="state.filterAttr= theaditem.attr;state.reverseColumn=theaditem.attr;state.filterReverse=!state.filterReverse;">
+                    {{theaditem.name}}
+                    <i ng-if="state.reverseColumn!= theaditem.attr"
+                       class="fa fa-unsorted"></i>
+                    <i ng-if="state.reverseColumn== theaditem.attr && !state.filterReverse"
+                       class="fa fa-sort-asc"></i>
+                    <i ng-if="state.reverseColumn== theaditem.attr && state.filterReverse"
+                       class="fa fa-sort-desc"></i>
+                  </th>
+                </tr>
+                </thead>
+
+                <tr
+                  ng-repeat="column in tableModel.selectedSrcTable.columns | filter: columnName | orderObjectBy:state.filterAttr:state.filterReverse">
+                  <td
+                    style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
+                    {{ column.id}}
+                  </td>
+                  <td
+                    style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
+                    {{ column.name}}
+                  </td>
+                  <td
+                    style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
+                    {{ column.datatype}}
+                  </td>
+                  <td
+                    style="{{(tableModel.selectedSrcTable.selectedSrcColumn.id == column.id)? 'background-color:#EBF9FE':''}}">
+                    <!--{{ tableModel.selectedSrcTable.cardinality[column.name]}}-->
+                    {{column.cardinality}}
+                  </td>
+                </tr>
+              </table>
+            </div>
+            <div ng-if="(tableModel.selectedSrcTable.columns | filter: columnName).length == 0" no-result
+                 text="No Matched Table Column."></div>
+            <div ng-if="!!!tableModel.selectedSrcTable.uuid">
+              <div no-result text="No Table Selected."></div>
+            </div>
+          </div>
         </div>
+      </div>
     </div>
+  </div>
 
 
-    <!--show load hive table tip when no models list-->
-    <div ng-show="!tableModel.selectedSrcDb.length" style="margin-top:40px;">
+  <!--show load hive table tip when no models list-->
+  <div ng-show="!tableModel.selectedSrcDb.length" style="margin-top:40px;">
 
-        <!--project selected tip-->
-        <div ng-if="projectModel.getSelectedProject();" class="box box-primary">
-            <div class="box-header with-border">
-                <i class="fa fa-bullhorn"></i>
-                <h3 class="box-title">No tables</h3>
-            </div>
-            <div class="box-body">
-                <div>
-                    <a tooltip="Load Hive Table" href="javascript:void(0);" ng-if="userService.hasRole('ROLE_MODELER')" ng-click="openModal()">Click here to load your hive table</a>
-                </div>
-            </div><!-- /.box-body -->
-        </div>
-        <!--roject not selected tip-->
-        <div ng-if="!projectModel.getSelectedProject();" class="box box-primary">
-            <div class="box-header with-border">
-                <i class="icon fa fa-warning"></i>
-                <h3 class="box-title">No project selected</h3>
-            </div>
-            <div class="box-body">
-                <div class="callout callout-danger">
-                    <p  class="text-danger">
-                       Please select your project first
-                    </p>
-                </div>
-            </div><!-- /.box-body -->
+    <!--project selected tip-->
+    <div ng-if="projectModel.getSelectedProject();" class="box box-primary">
+      <div class="box-header with-border">
+        <i class="fa fa-bullhorn"></i>
+
+        <h3 class="box-title">No tables</h3>
+      </div>
+      <div class="box-body">
+        <div>
+          <a tooltip="Load Hive Table" href="javascript:void(0);" ng-if="userService.hasRole('ROLE_MODELER')"
+             ng-click="openModal()">Click here to load your hive table</a>
         </div>
+      </div>
+      <!-- /.box-body -->
     </div>
+    <!--roject not selected tip-->
+    <div ng-if="!projectModel.getSelectedProject();" class="box box-primary">
+      <div class="box-header with-border">
+        <i class="icon fa fa-warning"></i>
 
-    <!--show load hive table tip when no models list-->
-    <div ng-show="tableModel.selectedSrcDb.length&&!tableModel.selectedSrcTable.name" style="margin-top:40px;">
-        <div class="box box-primary">
-            <div class="box-header with-border">
-                <i class="fa fa-bullhorn"></i>
-                <h3 class="box-title">No table selected</h3>
-            </div>
-            <div class="box-body">
-                Select your table
-            </div><!-- /.box-body -->
+        <h3 class="box-title">No project selected</h3>
+      </div>
+      <div class="box-body">
+        <div class="callout callout-danger">
+          <p class="text-danger">
+            Please select your project first
+          </p>
         </div>
+      </div>
+      <!-- /.box-body -->
     </div>
+  </div>
 
+  <!--show load hive table tip when no models list-->
+  <div ng-show="tableModel.selectedSrcDb.length&&!tableModel.selectedSrcTable.name" style="margin-top:40px;">
+    <div class="box box-primary">
+      <div class="box-header with-border">
+        <i class="fa fa-bullhorn"></i>
 
-    <script type="text/ng-template" id="addHiveTable.html">
-        <div class="modal-header">
-            <h4>Load Hive Table Metadata</h4>
-        </div>
-        <div class="modal-body">
-            <span><strong>Project: </strong>{{ $parent.projectName!=null?$parent.projectName:'NULL'}}</span>
-            <label for="tables"> Table Names:(Seperate with comma)</label>
+        <h3 class="box-title">No table selected</h3>
+      </div>
+      <div class="box-body">
+        Select your table
+      </div>
+      <!-- /.box-body -->
+    </div>
+  </div>
+
+
+  <script type="text/ng-template" id="addHiveTable.html">
+    <div class="modal-header">
+      <h4>Load Hive Table Metadata</h4>
+    </div>
+    <div class="modal-body">
+      <span><strong>Project: </strong>{{ $parent.projectName!=null?$parent.projectName:'NULL'}}</span>
+      <label for="tables"> Table Names:(Seperate with comma)</label>
             <textarea ng-model="$parent.tableNames" class="form-control" id="tables"
                       placeholder="table1,table2  By default,system will choose 'Default' as database,you can specify database like this 'database.table'"></textarea>
+    </div>
+    <div class="modal-footer">
+      <button class="btn btn-primary" ng-click="add()">Sync</button>
+      <button class="btn btn-primary" ng-click="cancel()">Cancel</button>
+    </div>
+  </script>
+
+  <script type="text/ng-template" id="addStreamingSource.html">
+    <div class="modal-header">
+      <h2>Create Streaming Table Schema</h2>
+    </div>
+
+    <div class="modal-body streaming-source" style="height: 480px;">
+      <div class="col-xs-5">
+        <p class="text-info">
+          Need to input streaming source record here, will detect the source schema and create a table schema for
+          streaming.
+        </p>
+
+        <div style="padding:15px;" class="has-error">
+          <small class="help-block" ng-show="streaming.sourceSchema==''&&form.setStreamingSchema.$sbumitted">Please
+            input Streaming source record to generate schema.
+          </small>
+        </div>
+        <div style="margin-bottom: 20px;">
+          <span class="label label-info">JSON</span>
         </div>
-        <div class="modal-footer">
-            <button class="btn btn-primary" ng-click="add()">Sync</button>
-            <button class="btn btn-primary" ng-click="cancel()">Cancel</button>
+        <div ng-model="streaming.sourceSchema" ui-ace="{
+    useWrapMode : true,
+    mode:'json',
+    onLoad: streamingOnLoad
+  }">
+
         </div>
-    </script>
+      </div>
+      <div class="col-xs-1" style="margin-top:300px;text-align:center;">
+        <button type="button" class="btn btn-primary" ng-click="streamingOnChange()"><i
+          class="fa fa-angle-double-right fa-5" style="font-size:2em;"></i></button>
+      </div>
+      <div class="col-xs-6" ng-show="table.sourceValid">
+        <ol class="text-info" style="margin-bottom: 30px;">
+          <li>Choose one 'timestamp' type column for streaming table.</li>
+          <li>Uncheck the 'timestamp' type column which will not be used.</li>
+        </ol>
+        <form class="form-horizontal" name="form.setStreamingSchema" novalidate>
+          <div class="form-group required">
+            <label class="col-xs-4 control-label" style="text-align: left;">Table Name</label>
+
+            <div class="col-xs-8"
+                 ng-class="{'has-error':form.setStreamingSchema.streamingObject.$invalid && (form.setStreamingSchema.streamingObject.$dirty||form.setStreamingSchema.$sbumitted)}">
+              <input type="text" name="streamingObject" required="" ng-model="table.name" class="form-control"/>
+              <small class="help-block"
+                     ng-show="form.setStreamingSchema.streamingObject.$error.required&&(form.setStreamingSchema.streamingObject.$dirty||form.setStreamingSchema.$sbumitted)">
+                Table name is required.
+              </small>
+            </div>
+          </div>
+        </form>
+        <table class="table table-hover table-bordered">
+          <tr>
+            <th>Check As Column</th>
+            <th>Column</th>
+            <th>Column Type</th>
+            <th>Comment</th>
+          </tr>
+          <tr ng-repeat="column in columnList">
+            <td><label style="width:100%;cursor: pointer;" for="{{column.name}}"><input style="width:1em;height:1em;"
+                                                                                        type="checkbox"
+                                                                                        id="{{column.name}}"
+                                                                                        ng-model="column.checked"
+                                                                                        ng-true-value="Y"
+                                                                                        ng-false-value="N"/></label>
+            </td>
+            <td>{{column.name}}</td>
+            <td>
+              <select chosen ng-model="column.type"
+                      ng-options="type as type for type in tableConfig.dataTypes"
+                      data-placeholder="select a column type"
+                      style="width: 200px !important;"
+                      class="chosen-select">
+              </select>
+            </td>
+            <td>
+              <label ng-if="column.type=='timestamp'&&column.fromSource=='Y'" class="badge badge-info">TIMESTAMP</label>
+              <label ng-if="column.fromSource=='N'" class="badge badge-info">AUTO APPEND</label>
+            </td>
+          </tr>
+        </table>
+
+        <div class="has-error" ng-if="rule.timestampColumnConflict">
+          <small class="help-block">
+            You should choose one, and only one 'timestamp' type column generated from source schema.
+          </small>
+        </div>
+      </div>
+    </div>
+    <div class="modal-footer">
+      <button class="btn btn-primary" ng-click="syncStreamingSchema()" ng-disabled="form.setStreamingSchema.$invalid">
+        Submit
+      </button>
+      <button class="btn btn-primary" ng-click="cancel()">Cancel</button>
+    </div>
+  </script>
 </div>



[2/3] incubator-kylin git commit: KYLIN-1011

Posted by qh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
new file mode 100644
index 0000000..9b5071b
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -0,0 +1,142 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.source.kafka;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.google.common.collect.Lists;
+import kafka.message.MessageAndOffset;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * each json message with a "timestamp" field
+ */
+public final class TimedJsonStreamParser implements StreamingParser {
+
+    private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class);
+
+    private List<TblColRef> allColumns;
+    private boolean formatTs = false;
+    private final ObjectMapper mapper = new ObjectMapper();
+    private String tsColName = "timestamp";
+    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+
+    public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) {
+        this.allColumns = allColumns;
+        if (!StringUtils.isEmpty(propertiesStr)) {
+            String[] properties = propertiesStr.split(";");
+            for (String prop : properties) {
+                try {
+                    String[] parts = prop.split("=");
+                    if (parts.length == 2) {
+                        switch (parts[0]) {
+                        case "formatTs":
+                            this.formatTs = Boolean.valueOf(parts[1]);
+                            break;
+                        case "tsColName":
+                            this.tsColName = parts[1];
+                            break;
+                        default:
+                            break;
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to parse property " + prop);
+                    //ignore
+                }
+            }
+        }
+
+        logger.info("TimedJsonStreamParser with formatTs {} tsColName {}", formatTs, tsColName);
+    }
+
+    @Override
+    public StreamingMessage parse(MessageAndOffset messageAndOffset) {
+        try {
+            Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            String tsStr = root.get(tsColName);
+            //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + //
+            //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));
+            long t;
+            if (StringUtils.isEmpty(tsStr)) {
+                t = 0;
+            } else {
+                t = Long.valueOf(tsStr);
+            }
+            ArrayList<String> result = Lists.newArrayList();
+
+            for (TblColRef column : allColumns) {
+                String columnName = column.getName();
+                if (columnName.equalsIgnoreCase("minute_start")) {
+                    long minuteStart = TimeUtil.getMinuteStart(t);
+                    result.add(formatTs ? DateFormat.formatToTimeStr(minuteStart) : String.valueOf(minuteStart));
+                } else if (columnName.equalsIgnoreCase("hour_start")) {
+                    long hourStart = TimeUtil.getHourStart(t);
+                    result.add(formatTs ? DateFormat.formatToTimeStr(hourStart) : String.valueOf(hourStart));
+                } else if (columnName.equalsIgnoreCase("day_start")) {
+                    //of day start we'll add yyyy-mm-dd
+                    long ts = TimeUtil.getDayStart(t);
+                    result.add(DateFormat.formatToDateStr(ts));
+                } else {
+                    String x = root.get(columnName.toLowerCase());
+                    result.add(x);
+                }
+            }
+
+            return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap());
+
+        } catch (IOException e) {
+            logger.error("error", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean filter(StreamingMessage streamingMessage) {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index b56231a..1aff0ce 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -81,6 +81,10 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("margin")
     private long margin;
 
+    //"configA=1;configB=2"
+    @JsonProperty("parserProperties")
+    private String parserProperties;
+
     public List<KafkaClusterConfig> getKafkaClusterConfigs() {
         return kafkaClusterConfigs;
     }
@@ -141,6 +145,14 @@ public class KafkaConfig extends RootPersistentEntity {
         this.margin = margin;
     }
 
+    public String getParserProperties() {
+        return parserProperties;
+    }
+
+    public void setParserProperties(String parserProperties) {
+        this.parserProperties = parserProperties;
+    }
+
     @Override
     public KafkaConfig clone() {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
index 25c011e..f6112a6 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
@@ -91,7 +91,7 @@ public class ITInvertedIndexHBaseTest extends HBaseMetadataTestCase {
 
         List<TableRecord> records = iterateRecords(slices);
         //dump(records);
-        System.out.println(records.size() + " records");
+        System.out.println("table name:" + tableName + " has " + records.size() + " records");
     }
 
     private List<TableRecord> iterateRecords(List<Slice> slices) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java b/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
deleted file mode 100644
index 7b9831a..0000000
--- a/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.kylin.job.monitor;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class MonitorCLI {
-
-    private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
-
-    public static void main(String[] args) {
-        Preconditions.checkArgument(args[0].equals("monitor"));
-
-        int i = 1;
-        List<String> receivers = null;
-        String host = null;
-        String tableName = null;
-        String authorization = null;
-        String cubeName = null;
-        String projectName = "default";
-        while (i < args.length) {
-            String argName = args[i];
-            switch (argName) {
-            case "-receivers":
-                receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
-                break;
-            case "-host":
-                host = args[++i];
-                break;
-            case "-tableName":
-                tableName = args[++i];
-                break;
-            case "-authorization":
-                authorization = args[++i];
-                break;
-            case "-cubeName":
-                cubeName = args[++i];
-                break;
-            case "-projectName":
-                projectName = args[++i];
-                break;
-            default:
-                throw new RuntimeException("invalid argName:" + argName);
-            }
-            i++;
-        }
-        Preconditions.checkArgument(receivers != null && receivers.size() > 0);
-        final StreamingMonitor streamingMonitor = new StreamingMonitor();
-        if (tableName != null) {
-            logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
-            Preconditions.checkNotNull(host);
-            Preconditions.checkNotNull(authorization);
-            Preconditions.checkNotNull(tableName);
-            streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
-        }
-        if (cubeName != null) {
-            logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
-            streamingMonitor.checkCube(receivers, cubeName,host);
-        }
-        System.exit(0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java b/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
deleted file mode 100644
index e23f065..0000000
--- a/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package org.apache.kylin.job.monitor;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingMonitor {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
-
-    public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
-        String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
-        StringBuilder stringBuilder = new StringBuilder();
-        String url = host + "/kylin/api/query";
-        PostMethod request = new PostMethod(url);
-        try {
-
-            request.addRequestHeader("Authorization", "Basic " + authorization);
-            request.addRequestHeader("Content-Type", "application/json");
-            String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
-            request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
-
-            int statusCode = new HttpClient().executeMethod(request);
-            String msg = Bytes.toString(request.getResponseBody());
-            stringBuilder.append("host:").append(host).append("\n");
-            stringBuilder.append("query:").append(query).append("\n");
-            stringBuilder.append("statusCode:").append(statusCode).append("\n");
-            if (statusCode == 200) {
-                title += "succeed";
-                final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
-                stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
-                stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
-            } else {
-                title += "failed";
-                stringBuilder.append("response:").append(msg).append("\n");
-            }
-        } catch (Exception e) {
-            final StringWriter out = new StringWriter();
-            e.printStackTrace(new PrintWriter(out));
-            title += "failed";
-            stringBuilder.append(out.toString());
-        } finally {
-            request.releaseConnection();
-        }
-        logger.info("title:" + title);
-        logger.info("content:" + stringBuilder.toString());
-        sendMail(receivers, title, stringBuilder.toString());
-    }
-
-    public static final List<Pair<Long, Long>> findGaps(String cubeName) {
-        List<CubeSegment> segments = getSortedReadySegments(cubeName);
-        List<Pair<Long, Long>> gaps = Lists.newArrayList();
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
-                continue;
-            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
-                gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
-            }
-        }
-        return gaps;
-    }
-
-    private static List<CubeSegment> getSortedReadySegments(String cubeName) {
-        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        Preconditions.checkNotNull(cube);
-        final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
-        logger.info("totally " + segments.size() + " cubeSegments");
-        Collections.sort(segments);
-        return segments;
-    }
-
-    public static final List<Pair<String, String>> findOverlaps(String cubeName) {
-        List<CubeSegment> segments = getSortedReadySegments(cubeName);
-        List<Pair<String, String>> overlaps = Lists.newArrayList();
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
-                continue;
-            } else {
-                overlaps.add(Pair.newPair(first.getName(), second.getName()));
-            }
-        }
-        return overlaps;
-    }
-
-    public void checkCube(List<String> receivers, String cubeName, String host) {
-        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        if (cube == null) {
-            logger.info("cube:" + cubeName + " does not exist");
-            return;
-        }
-        List<Pair<Long, Long>> gaps = findGaps(cubeName);
-        List<Pair<String, String>> overlaps = Lists.newArrayList();
-        StringBuilder content = new StringBuilder();
-        if (!gaps.isEmpty()) {
-            content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
-                @Nullable
-                @Override
-                public String apply(Pair<Long, Long> input) {
-                    return parseInterval(input);
-                }
-            }), "\n")).append("\n");
-        }
-        if (!overlaps.isEmpty()) {
-            content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
-        }
-        if (content.length() > 0) {
-            logger.info(content.toString());
-            sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
-        } else {
-            logger.info("no gaps or overlaps");
-        }
-    }
-
-    private String parseInterval(Pair<Long, Long> interval) {
-        return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
-    }
-
-    private void sendMail(List<String> receivers, String title, String content) {
-        final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
-        mailService.sendMail(receivers, title, content, false);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
deleted file mode 100644
index 95fbc9d..0000000
--- a/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import java.util.List;
-import java.util.Properties;
-
-import javax.annotation.Nullable;
-
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.streaming.BrokerConfig;
-import org.apache.kylin.streaming.KafkaClusterConfig;
-import org.apache.kylin.streaming.StreamingConfig;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-
-/**
- * Load prepared data into kafka(for test use)
- */
-public class KafkaDataLoader {
-
-    public static void loadIntoKafka(StreamingConfig streamingConfig, List<String> messages) {
-
-        KafkaClusterConfig clusterConfig = streamingConfig.getKafkaClusterConfigs().get(0);
-        String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
-            @Nullable
-            @Override
-            public String apply(BrokerConfig brokerConfig) {
-                return brokerConfig.getHost() + ":" + brokerConfig.getPort();
-            }
-        }), ",");
-        Properties props = new Properties();
-        props.put("metadata.broker.list", brokerList);
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("request.required.acks", "1");
-
-        ProducerConfig config = new ProducerConfig(props);
-
-        Producer<String, String> producer = new Producer<String, String>(config);
-
-        List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
-        for (int i = 0; i < messages.size(); ++i) {
-            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), messages.get(i));
-            keyedMessages.add(keyedMessage);
-        }
-        producer.send(keyedMessages);
-        producer.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 4212fea..551006f 100644
--- a/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -63,7 +63,7 @@ import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.job.monitor.StreamingMonitor;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
 import org.apache.kylin.metadata.model.IntermediateColumnDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.hbase.HBaseConnection;


[3/3] incubator-kylin git commit: KYLIN-1011

Posted by qh...@apache.org.
KYLIN-1011


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/22ef792c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/22ef792c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/22ef792c

Branch: refs/heads/KYLIN-1011
Commit: 22ef792cfe1152b0c6066bd2f7b466f81bb9d0cc
Parents: 1ea5870
Author: qianhao.zhou <qi...@ebay.com>
Authored: Wed Sep 9 19:13:01 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Sep 24 12:06:24 2015 +0800

----------------------------------------------------------------------
 assembly/pom.xml                                |   5 -
 .../kylin/job/BuildCubeWithStreamTest.java      |  37 +++--
 .../apache/kylin/job/BuildIIWithStreamTest.java |  81 +++++++---
 .../java/org/apache/kylin/job/DeployUtil.java   |  34 ++--
 .../job/ITKafkaBasedIIStreamBuilderTest.java    |  85 ----------
 .../kylin/job/hadoop/invertedindex/IITest.java  |  39 ++---
 .../job/streaming/CubeStreamConsumerTest.java   |  90 -----------
 .../kylin/job/streaming/KafkaDataLoader.java    |  53 +++++++
 .../streaming/PeriodicalStreamBuilderTest.java  | 144 -----------------
 build/bin/kylin.sh                              |   4 +-
 build/script/compress.sh                        |   1 +
 .../kylin/common/persistence/ResourceStore.java |   1 +
 .../kylin/engine/streaming/StreamingCLI.java    |  99 ------------
 .../kylin/engine/streaming/cli/MonitorCLI.java  |  70 +++++++++
 .../engine/streaming/cli/StreamingCLI.java      | 120 +++++++++++++++
 .../streaming/monitor/StreamingMonitor.java     | 154 +++++++++++++++++++
 .../engine/streaming/util/StreamingUtils.java   |   2 +-
 .../localmeta/kafka/kafka_test.json             |  20 +++
 .../kafka/test_streaming_table_cube.json        |  22 +++
 .../kafka/test_streaming_table_ii.json          |  22 +++
 .../streaming/test_streaming_table_cube.json    |  18 +--
 .../streaming/test_streaming_table_ii.json      |  18 +--
 .../invertedindex/streaming/SliceBuilder.java   |  81 ++++++++++
 .../kafka/ByteBufferBackedInputStream.java      |  53 +++++++
 .../kylin/source/kafka/KafkaConfigManager.java  |  94 ++---------
 .../kylin/source/kafka/KafkaStreamingInput.java |   8 +-
 .../source/kafka/StringStreamingParser.java     |  65 ++++++++
 .../source/kafka/TimedJsonStreamParser.java     | 142 +++++++++++++++++
 .../kylin/source/kafka/config/KafkaConfig.java  |  12 ++
 .../hbase/ii/ITInvertedIndexHBaseTest.java      |   2 +-
 .../apache/kylin/job/monitor/MonitorCLI.java    |  69 ---------
 .../kylin/job/monitor/StreamingMonitor.java     | 154 -------------------
 .../kylin/job/streaming/KafkaDataLoader.java    |  54 -------
 .../kylin/job/streaming/StreamingBootstrap.java |   2 +-
 34 files changed, 948 insertions(+), 907 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 99557fb..9f17913 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -49,11 +49,6 @@
             <artifactId>kylin-invertedindex</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-streaming</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
         
         <!-- Env & Test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index b02b2f2..c0e7978 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -34,21 +34,18 @@
 
 package org.apache.kylin.job;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.job.streaming.BootstrapConfig;
-import org.apache.kylin.job.streaming.StreamingBootstrap;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
 import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
-import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.StreamingManager;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -56,6 +53,10 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
 /**
  *  for streaming cubing case "test_streaming_table"
  */
@@ -84,12 +85,14 @@ public class BuildCubeWithStreamTest {
 
         kylinConfig = KylinConfig.getInstanceFromEnv();
 
-        //Use a random toplic for kafka data stream
-        StreamingConfig streamingConfig = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingName);
+        final StreamingConfig config = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName);
+
+        //Use a random topic for kafka data stream
+        KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingName);
         streamingConfig.setTopic(UUID.randomUUID().toString());
-        StreamingManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
+        KafkaConfigManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
 
-        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, streamingConfig);
+        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, config.getCubeName(), streamingConfig);
     }
 
     @AfterClass
@@ -119,14 +122,10 @@ public class BuildCubeWithStreamTest {
 
     @Test
     public void test() throws Exception {
+        logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval));
         for (long start = startTime; start < endTime; start += batchInterval) {
-            BootstrapConfig bootstrapConfig = new BootstrapConfig();
-            bootstrapConfig.setStart(start);
-            bootstrapConfig.setEnd(start + batchInterval);
-            bootstrapConfig.setOneOff(true);
-            bootstrapConfig.setPartitionId(0);
-            bootstrapConfig.setStreaming(streamingName);
-            StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(bootstrapConfig);
+            logger.info(String.format("build batch:{%d, %d}", start, start + batchInterval));
+            new OneOffStreamingBuilder(streamingName, start, start + batchInterval).build().run();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 5ca3b29..256297e 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -39,28 +39,34 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.invertedindex.streaming.SliceBuilder;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -68,10 +74,8 @@ import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.source.hive.HiveTableReader;
+import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -184,32 +188,32 @@ public class BuildIIWithStreamTest {
                 logger.info("measure:" + tblColRef.getName());
             }
         }
-        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<StreamMessage>();
         final IISegment segment = createSegment(iiName);
+        final HTableInterface htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier());
         String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() };
         ToolRunner.run(new IICreateHTableJob(), args);
 
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final StreamBuilder streamBuilder = StreamBuilder.newLimitedSizeStreamBuilder(iiName, queue, new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0), 0, segment.getIIDesc().getSliceSize());
+        final IIDesc iiDesc = segment.getIIDesc();
+        final SliceBuilder sliceBuilder = new SliceBuilder(desc, (short) 0, iiDesc.isUseLocalDictionary());
 
         List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
         int count = sorted.size();
+        ArrayList<StreamingMessage> messages = Lists.newArrayList();
         for (String[] row : sorted) {
-            logger.info("another row: " + StringUtils.join(row, ","));
-            queue.put(parse(row));
+            if (messages.size() < iiDesc.getSliceSize()) {
+                messages.add(parse(row));
+            } else {
+                build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
+                messages = Lists.newArrayList();
+                messages.add((parse(row)));
+            }
+        }
+        if (!messages.isEmpty()) {
+            build(sliceBuilder, new StreamingBatch(messages, Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())), htable);
         }
 
         reader.close();
         logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
-        queue.put(StreamMessage.EOF);
-        final Future<?> future = executorService.submit(streamBuilder);
-        try {
-            future.get();
-        } catch (Exception e) {
-            logger.error("stream build failed", e);
-            fail("stream build failed");
-        }
-
         logger.info("stream build finished, htable name:" + segment.getStorageLocationIdentifier());
     }
 
@@ -224,9 +228,38 @@ public class BuildIIWithStreamTest {
             }
         }
     }
+    
+    private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) {
+        final Slice slice = sliceBuilder.buildSlice(batch);
+        try {
+            loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo()));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
+        List<Put> data = Lists.newArrayList();
+        for (IIRow row : codec.encodeKeyValue(slice)) {
+            final byte[] key = row.getKey().get();
+            final byte[] value = row.getValue().get();
+            Put put = new Put(key);
+            put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
+            final ImmutableBytesWritable dictionary = row.getDictionary();
+            final byte[] dictBytes = dictionary.get();
+            if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) {
+                put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes);
+            } else {
+                throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength());
+            }
+            data.add(put);
+        }
+        hTable.put(data);
+        //omit hTable.flushCommits(), because htable is auto flush
+    }
 
-    private StreamMessage parse(String[] row) {
-        return new StreamMessage(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes());
+    private StreamingMessage parse(String[] row) {
+        return new StreamingMessage(Lists.newArrayList(row), System.currentTimeMillis(), System.currentTimeMillis(), Collections.<String, Object>emptyMap());
     }
 
     private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index d47a664..722b6b8 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,14 +18,9 @@
 
 package org.apache.kylin.job;
 
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -35,6 +30,7 @@ import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.job.dataGen.FactTableGenerator;
 import org.apache.kylin.job.streaming.KafkaDataLoader;
 import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
@@ -43,15 +39,16 @@ import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.hive.HiveClient;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamingConfig;
-import org.apache.kylin.streaming.TimedJsonStreamParser;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.TimedJsonStreamParser;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.maven.model.Model;
 import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import java.io.*;
+import java.util.List;
 
 public class DeployUtil {
     private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class);
@@ -146,14 +143,15 @@ public class DeployUtil {
         deployHiveTables();
     }
 
-    public static void prepareTestDataForStreamingCube(long startTime, long endTime, StreamingConfig streamingConfig) throws IOException {
-        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(streamingConfig.getCubeName());
+    public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, KafkaConfig kafkaConfig) throws IOException {
+        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
         List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
+        List<String> data2 = StreamingTableDataGenerator.generate(10, endTime, endTime + 300000, cubeInstance.getFactTable());
         TableDesc tableDesc = cubeInstance.getFactTableDesc();
-
         //load into kafka
-        KafkaDataLoader.loadIntoKafka(streamingConfig, data);
-        logger.info("Write {} messages into topic {}", data.size(), streamingConfig.getTopic());
+        KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data);
+        KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data2);
+        logger.info("Write {} messages into topic {}", data.size(), kafkaConfig.getTopic());
 
         //csv data for H2 use
         List<TblColRef> tableColumns = Lists.newArrayList();
@@ -163,7 +161,7 @@ public class DeployUtil {
         TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
         StringBuilder sb = new StringBuilder();
         for (String json : data) {
-            List<String> rowColumns = timedJsonStreamParser.parse(new StreamMessage(0, json.getBytes())).getStreamMessage();
+            List<String> rowColumns = timedJsonStreamParser.parse(new MessageAndOffset(new Message(json.getBytes()), 0)).getData();
             sb.append(StringUtils.join(rowColumns, ","));
             sb.append(System.getProperty("line.separator"));
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
deleted file mode 100644
index 6a615cb..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.job;
-
-import java.io.File;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.streaming.StreamingBootstrap;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-@Ignore("this test case will break existing metadata store")
-public class ITKafkaBasedIIStreamBuilderTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(ITKafkaBasedIIStreamBuilderTest.class);
-
-    private KylinConfig kylinConfig;
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-    }
-
-    @Test
-    public void test() throws Exception {
-        final StreamingBootstrap bootstrap = StreamingBootstrap.getInstance(kylinConfig);
-        bootstrap.start("eagle", 0);
-        Thread.sleep(30 * 60 * 1000);
-        logger.info("time is up, stop streaming");
-        bootstrap.stop();
-        Thread.sleep(5 * 1000);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index dcd460b..913a2f7 100644
--- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -10,12 +10,18 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
+import com.google.common.base.Function;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.util.FIFOIterable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.index.Slice;
@@ -26,6 +32,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
 import org.apache.kylin.invertedindex.model.IIRow;
 import org.apache.kylin.invertedindex.model.KeyValueCodec;
+import org.apache.kylin.invertedindex.streaming.SliceBuilder;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -33,6 +40,8 @@ import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.StreamingParser;
+import org.apache.kylin.source.kafka.StringStreamingParser;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
@@ -41,18 +50,11 @@ import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.ClearTextDictionar
 import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.EndpointAggregators;
 import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint;
 import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.ParsedStreamMessage;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamParser;
-import org.apache.kylin.streaming.StringStreamParser;
-import org.apache.kylin.streaming.invertedindex.SliceBuilder;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -78,24 +80,23 @@ public class IITest extends LocalFileMetadataTestCase {
         this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
         this.iiDesc = ii.getDescriptor();
 
-        List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() {
+        List<MessageAndOffset> messages = Lists.transform(Arrays.asList(inputData), new Function<String, MessageAndOffset>() {
             @Nullable
             @Override
-            public StreamMessage apply(String input) {
-                return new StreamMessage(System.currentTimeMillis(), input.getBytes());
+            public MessageAndOffset apply(String input) {
+                return new MessageAndOffset(new Message(input.getBytes()), System.currentTimeMillis());
             }
         });
 
-        List<List<String>> parsedStreamMessages = Lists.newArrayList();
-        StreamParser parser = StringStreamParser.instance;
-
-        MicroStreamBatch batch = new MicroStreamBatch(0);
-        for (StreamMessage message : streamMessages) {
-            ParsedStreamMessage parsedStreamMessage = parser.parse(message);
-            if ((parsedStreamMessage.isAccepted())) {
-                batch.add(parsedStreamMessage);
+        final StreamingParser parser = StringStreamingParser.instance;
+        final List<StreamingMessage> streamingMessages = Lists.transform(messages, new Function<MessageAndOffset, StreamingMessage>() {
+            @Nullable
+            @Override
+            public StreamingMessage apply(@Nullable MessageAndOffset input) {
+                return parser.parse(input);
             }
-        }
+        });
+        StreamingBatch batch = new StreamingBatch(streamingMessages, Pair.newPair(0L, System.currentTimeMillis()));
 
         iiRows = Lists.newArrayList();
         final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
deleted file mode 100644
index be4fa26..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-@Ignore
-public class CubeStreamConsumerTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeStreamConsumerTest.class);
-
-    private KylinConfig kylinConfig;
-
-    private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready";
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
-        // remove all existing segments
-        CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
-
-    }
-
-    @Test
-    public void test() throws Exception {
-        LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
-        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
-        queues.add(queue);
-        StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000);
-        final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
-        loadDataFromLocalFile(queue, 100000);
-        future.get();
-    }
-
-    private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException {
-        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
-        String line;
-        int count = 0;
-        while ((line = br.readLine()) != null && count++ < maxCount) {
-            final List<String> strings = Arrays.asList(line.split("\t"));
-            queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes()));
-        }
-        queue.put(StreamMessage.EOF);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
new file mode 100644
index 0000000..c3caa9b
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -0,0 +1,53 @@
+package org.apache.kylin.job.streaming;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.annotation.Nullable;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import org.apache.kylin.source.kafka.config.BrokerConfig;
+import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
+
+/**
+ * Load prepared data into kafka(for test use)
+ */
+public class KafkaDataLoader {
+
+    public static void loadIntoKafka(List<KafkaClusterConfig> kafkaClusterConfigs, List<String> messages) {
+
+        KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
+        String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
+            @Nullable
+            @Override
+            public String apply(BrokerConfig brokerConfig) {
+                return brokerConfig.getHost() + ":" + brokerConfig.getPort();
+            }
+        }), ",");
+        Properties props = new Properties();
+        props.put("metadata.broker.list", brokerList);
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("request.required.acks", "1");
+
+        ProducerConfig config = new ProducerConfig(props);
+
+        Producer<String, String> producer = new Producer<String, String>(config);
+
+        List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
+        for (int i = 0; i < messages.size(); ++i) {
+            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
+            keyedMessages.add(keyedMessage);
+        }
+        producer.send(keyedMessages);
+        producer.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java b/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
deleted file mode 100644
index dc6d312..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/PeriodicalStreamBuilderTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.MicroStreamBatchConsumer;
-import org.apache.kylin.streaming.ParsedStreamMessage;
-import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StreamParser;
-import org.apache.kylin.streaming.StreamingManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class PeriodicalStreamBuilderTest extends LocalFileMetadataTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(PeriodicalStreamBuilderTest.class);
-
-    @Before
-    public void setup() {
-        this.createTestMetadata();
-
-    }
-
-    @After
-    public void clear() {
-        this.cleanupTestMetadata();
-    }
-
-    private List<StreamMessage> prepareTestData(long start, long end, int count) {
-        double step = (double) (end - start) / (count - 1);
-        long ts = start;
-        int offset = 0;
-        ArrayList<StreamMessage> result = Lists.newArrayList();
-        for (int i = 0; i < count - 1; ++i) {
-            result.add(new StreamMessage(offset++, String.valueOf(ts).getBytes()));
-            ts += step;
-        }
-        result.add(new StreamMessage(offset++, String.valueOf(end).getBytes()));
-        assertEquals(count, result.size());
-        assertEquals(start + "", new String(result.get(0).getRawData()));
-        assertEquals(end + "", new String(result.get(count - 1).getRawData()));
-        return result;
-    }
-
-    @Test
-    public void test() throws ExecutionException, InterruptedException {
-
-        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
-        queues.add(new LinkedBlockingQueue<StreamMessage>());
-        queues.add(new LinkedBlockingQueue<StreamMessage>());
-
-        final long interval = 3000L;
-        final long nextPeriodStart = TimeUtil.getNextPeriodStart(System.currentTimeMillis(), interval);
-
-        final List<Integer> partitionIds = Lists.newArrayList();
-        for (int i = 0; i < queues.size(); i++) {
-            partitionIds.add(i);
-        }
-
-        final MicroStreamBatchConsumer consumer = new MicroStreamBatchConsumer() {
-            @Override
-            public void consume(MicroStreamBatch microStreamBatch) throws Exception {
-                logger.info("consuming batch:" + microStreamBatch.getPartitionId() + " count:" + microStreamBatch.size() + " timestamp:" + microStreamBatch.getTimestamp() + " offset:" + microStreamBatch.getOffset());
-            }
-
-            @Override
-            public void stop() {
-                logger.info("consumer stopped");
-            }
-        };
-        final StreamBuilder streamBuilder = StreamBuilder.newPeriodicalStreamBuilder("test", queues, consumer, nextPeriodStart, interval);
-
-        streamBuilder.setStreamParser(new StreamParser() {
-            @Override
-            public ParsedStreamMessage parse(StreamMessage streamMessage) {
-                return new ParsedStreamMessage(Collections.<String> emptyList(), streamMessage.getOffset(), Long.parseLong(new String(streamMessage.getRawData())), true);
-            }
-        });
-
-        Future<?> future = Executors.newSingleThreadExecutor().submit(streamBuilder);
-        long timeout = nextPeriodStart + interval;
-        int messageCount = 0;
-        int inPeriodMessageCount = 0;
-        int expectedOffset = 0;
-        logger.info("prepare to add StreamMessage");
-        while (true) {
-            long ts = System.currentTimeMillis();
-            if (ts >= timeout + interval) {
-                break;
-            }
-            if (ts >= nextPeriodStart && ts < timeout) {
-                inPeriodMessageCount++;
-            }
-            for (BlockingQueue<StreamMessage> queue : queues) {
-                queue.put(new StreamMessage(messageCount, String.valueOf(ts).getBytes()));
-            }
-            if (expectedOffset == 0 && ts >= timeout) {
-                expectedOffset = messageCount - 1;
-            }
-            messageCount++;
-            Thread.sleep(10);
-        }
-        logger.info("totally put " + messageCount + " StreamMessages");
-        logger.info("totally in period " + inPeriodMessageCount + " StreamMessages");
-
-        for (BlockingQueue<StreamMessage> queue : queues) {
-            queue.put(StreamMessage.EOF);
-        }
-
-        future.get();
-
-        for (BlockingQueue<StreamMessage> queue : queues) {
-            queue.take();
-        }
-
-        final Map<Integer, Long> offsets = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getOffset("test", partitionIds);
-        logger.info("offset:" + offsets);
-        for (Long offset : offsets.values()) {
-            assertEquals(expectedOffset, offset.longValue());
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index b27864c..b581e09 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -115,7 +115,7 @@ then
         -Dkylin.hive.dependency=${hive_dependency} \
         -Dkylin.hbase.dependency=${hbase_dependency} \
         -Dspring.profiles.active=${spring_profile} \
-        org.apache.kylin.job.streaming.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 &
+        org.apache.kylin.engine.streaming.cli.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 &
         echo "streaming started name: $3 id: $4"
         exit 0
     elif [ $2 == "stop" ]
@@ -170,7 +170,7 @@ then
     -Dkylin.hive.dependency=${hive_dependency} \
     -Dkylin.hbase.dependency=${hbase_dependency} \
     -Dspring.profiles.active=${spring_profile} \
-    org.apache.kylin.job.monitor.MonitorCLI $@ >> ${KYLIN_HOME}/logs/monitor.log 2>&1
+    org.apache.kylin.engine.streaming.cli.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1
     exit 0
 else
     echo "usage: kylin.sh start or kylin.sh stop"

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/build/script/compress.sh
----------------------------------------------------------------------
diff --git a/build/script/compress.sh b/build/script/compress.sh
index a424b98..c70e567 100755
--- a/build/script/compress.sh
+++ b/build/script/compress.sh
@@ -21,6 +21,7 @@ rm -rf lib tomcat commit_SHA1
 find kylin-${version} -type d -exec chmod 755 {} \;
 find kylin-${version} -type f -exec chmod 644 {} \;
 find kylin-${version} -type f -name "*.sh" -exec chmod 755 {} \;
+mkdir -p ../dist
 tar -cvzf ../dist/kylin-${version}.tar.gz kylin-${version}
 rm -rf kylin-${version}
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 29e2345..89100a2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -54,6 +54,7 @@ abstract public class ResourceStore {
     public static final String TABLE_RESOURCE_ROOT = "/table";
     public static final String HYBRID_RESOURCE_ROOT = "/hybrid";
     public static final String STREAMING_RESOURCE_ROOT = "/streaming";
+    public static final String KAfKA_RESOURCE_ROOT = "/kafka";
     public static final String STREAMING_OUTPUT_RESOURCE_ROOT = "/streaming_output";
     public static final String CUBE_STATISTICS_ROOT = "/cube_statistics";
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
deleted file mode 100644
index 8bf52c1..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.engine.streaming;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.cache.RemoteCacheUpdater;
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public class StreamingCLI {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
-
-    public static void main(String[] args) {
-        try {
-            AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
-
-            Preconditions.checkArgument(args[0].equals("streaming"));
-            Preconditions.checkArgument(args[1].equals("start"));
-
-            int i = 2;
-            BootstrapConfig bootstrapConfig = new BootstrapConfig();
-            while (i < args.length) {
-                String argName = args[i];
-                switch (argName) {
-                case "-oneoff":
-                    bootstrapConfig.setOneOff(Boolean.parseBoolean(args[++i]));
-                    break;
-                case "-start":
-                    bootstrapConfig.setStart(Long.parseLong(args[++i]));
-                    break;
-                case "-end":
-                    bootstrapConfig.setEnd(Long.parseLong(args[++i]));
-                    break;
-                case "-streaming":
-                    bootstrapConfig.setStreaming(args[++i]);
-                    break;
-                case "-partition":
-                    bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
-                    break;
-                case "-fillGap":
-                    bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
-                    break;
-                default:
-                    logger.warn("ignore this arg:" + argName);
-                }
-                i++;
-            }
-            final Runnable runnable = new OneOffStreamingBuilder(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd()).build();
-            runnable.run();
-            logger.info("streaming process stop, exit with 0");
-            System.exit(0);
-        } catch (Exception e) {
-            printArgsError(args);
-            logger.error("error start streaming", e);
-            System.exit(-1);
-        }
-    }
-
-    private static void printArgsError(String[] args) {
-        logger.warn("invalid args:" + StringUtils.join(args, " "));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
new file mode 100644
index 0000000..d7dc6b3
--- /dev/null
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.engine.streaming.cli;
+
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class MonitorCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
+
+    public static void main(String[] args) {
+        Preconditions.checkArgument(args[0].equals("monitor"));
+
+        int i = 1;
+        List<String> receivers = null;
+        String host = null;
+        String tableName = null;
+        String authorization = null;
+        String cubeName = null;
+        String projectName = "default";
+        while (i < args.length) {
+            String argName = args[i];
+            switch (argName) {
+            case "-receivers":
+                receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
+                break;
+            case "-host":
+                host = args[++i];
+                break;
+            case "-tableName":
+                tableName = args[++i];
+                break;
+            case "-authorization":
+                authorization = args[++i];
+                break;
+            case "-cubeName":
+                cubeName = args[++i];
+                break;
+            case "-projectName":
+                projectName = args[++i];
+                break;
+            default:
+                throw new RuntimeException("invalid argName:" + argName);
+            }
+            i++;
+        }
+        Preconditions.checkArgument(receivers != null && receivers.size() > 0);
+        final StreamingMonitor streamingMonitor = new StreamingMonitor();
+        if (tableName != null) {
+            logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
+            Preconditions.checkNotNull(host);
+            Preconditions.checkNotNull(authorization);
+            Preconditions.checkNotNull(tableName);
+            streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
+        }
+        if (cubeName != null) {
+            logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
+            streamingMonitor.checkCube(receivers, cubeName,host);
+        }
+        System.exit(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
new file mode 100644
index 0000000..a4ccabc
--- /dev/null
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -0,0 +1,120 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.engine.streaming.cli;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.cache.RemoteCacheUpdater;
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.engine.streaming.BootstrapConfig;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class StreamingCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
+
+    public static void main(String[] args) {
+        try {
+            AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
+
+            Preconditions.checkArgument(args[0].equals("streaming"));
+            Preconditions.checkArgument(args[1].equals("start"));
+
+            int i = 2;
+            BootstrapConfig bootstrapConfig = new BootstrapConfig();
+            while (i < args.length) {
+                String argName = args[i];
+                switch (argName) {
+                case "-oneoff":
+                    Boolean.parseBoolean(args[++i]);
+                    break;
+                case "-start":
+                    bootstrapConfig.setStart(Long.parseLong(args[++i]));
+                    break;
+                case "-end":
+                    bootstrapConfig.setEnd(Long.parseLong(args[++i]));
+                    break;
+                case "-streaming":
+                    bootstrapConfig.setStreaming(args[++i]);
+                    break;
+                case "-partition":
+                    bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
+                    break;
+                case "-fillGap":
+                    bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
+                    break;
+                default:
+                    logger.warn("ignore this arg:" + argName);
+                }
+                i++;
+            }
+            if (bootstrapConfig.isFillGap()) {
+                final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(bootstrapConfig.getStreaming());
+                final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+                logger.info("all gaps:" + StringUtils.join(gaps, ","));
+                for (Pair<Long, Long> gap : gaps) {
+                    startOneOffCubeStreaming(bootstrapConfig.getStreaming(), gap.getFirst(), gap.getSecond());
+                }
+            } else {
+                startOneOffCubeStreaming(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
+                logger.info("streaming process finished, exit with 0");
+                System.exit(0);
+            }
+        } catch (Exception e) {
+            printArgsError(args);
+            logger.error("error start streaming", e);
+            System.exit(-1);
+        }
+    }
+    
+    private static void startOneOffCubeStreaming(String streaming, long start, long end) {
+        final Runnable runnable = new OneOffStreamingBuilder(streaming, start, end).build();
+        runnable.run();
+    }
+
+    private static void printArgsError(String[] args) {
+        logger.warn("invalid args:" + StringUtils.join(args, " "));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
new file mode 100644
index 0000000..a6b8a9f
--- /dev/null
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
@@ -0,0 +1,154 @@
+package org.apache.kylin.engine.streaming.monitor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.MailService;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class StreamingMonitor {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
+
+    public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
+        String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
+        StringBuilder stringBuilder = new StringBuilder();
+        String url = host + "/kylin/api/query";
+        PostMethod request = new PostMethod(url);
+        try {
+
+            request.addRequestHeader("Authorization", "Basic " + authorization);
+            request.addRequestHeader("Content-Type", "application/json");
+            String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
+            request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
+
+            int statusCode = new HttpClient().executeMethod(request);
+            String msg = Bytes.toString(request.getResponseBody());
+            stringBuilder.append("host:").append(host).append("\n");
+            stringBuilder.append("query:").append(query).append("\n");
+            stringBuilder.append("statusCode:").append(statusCode).append("\n");
+            if (statusCode == 200) {
+                title += "succeed";
+                final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
+                stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
+                stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
+            } else {
+                title += "failed";
+                stringBuilder.append("response:").append(msg).append("\n");
+            }
+        } catch (Exception e) {
+            final StringWriter out = new StringWriter();
+            e.printStackTrace(new PrintWriter(out));
+            title += "failed";
+            stringBuilder.append(out.toString());
+        } finally {
+            request.releaseConnection();
+        }
+        logger.info("title:" + title);
+        logger.info("content:" + stringBuilder.toString());
+        sendMail(receivers, title, stringBuilder.toString());
+    }
+
+    public static final List<Pair<Long, Long>> findGaps(String cubeName) {
+        List<CubeSegment> segments = getSortedReadySegments(cubeName);
+        List<Pair<Long, Long>> gaps = Lists.newArrayList();
+        for (int i = 0; i < segments.size() - 1; ++i) {
+            CubeSegment first = segments.get(i);
+            CubeSegment second = segments.get(i + 1);
+            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+                continue;
+            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
+                gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
+            }
+        }
+        return gaps;
+    }
+
+    private static List<CubeSegment> getSortedReadySegments(String cubeName) {
+        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        Preconditions.checkNotNull(cube);
+        final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
+        logger.info("totally " + segments.size() + " cubeSegments");
+        Collections.sort(segments);
+        return segments;
+    }
+
+    public static final List<Pair<String, String>> findOverlaps(String cubeName) {
+        List<CubeSegment> segments = getSortedReadySegments(cubeName);
+        List<Pair<String, String>> overlaps = Lists.newArrayList();
+        for (int i = 0; i < segments.size() - 1; ++i) {
+            CubeSegment first = segments.get(i);
+            CubeSegment second = segments.get(i + 1);
+            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+                continue;
+            } else {
+                overlaps.add(Pair.newPair(first.getName(), second.getName()));
+            }
+        }
+        return overlaps;
+    }
+
+    public void checkCube(List<String> receivers, String cubeName, String host) {
+        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        if (cube == null) {
+            logger.info("cube:" + cubeName + " does not exist");
+            return;
+        }
+        List<Pair<Long, Long>> gaps = findGaps(cubeName);
+        List<Pair<String, String>> overlaps = Lists.newArrayList();
+        StringBuilder content = new StringBuilder();
+        if (!gaps.isEmpty()) {
+            content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
+                @Nullable
+                @Override
+                public String apply(Pair<Long, Long> input) {
+                    return parseInterval(input);
+                }
+            }), "\n")).append("\n");
+        }
+        if (!overlaps.isEmpty()) {
+            content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
+        }
+        if (content.length() > 0) {
+            logger.info(content.toString());
+            sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
+        } else {
+            logger.info("no gaps or overlaps");
+        }
+    }
+
+    private String parseInterval(Pair<Long, Long> interval) {
+        return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
+    }
+
+    private void sendMail(List<String> receivers, String title, String content) {
+        final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
+        mailService.sendMail(receivers, title, content, false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
index 718fc43..47db924 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
@@ -53,7 +53,7 @@ public class StreamingUtils {
     }
 
     public static IStreamingOutput getStreamingOutput(String streaming) {
-        return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.HBaseStreamingOutput");
+        return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput");
     }
 
     public static StreamingBatchBuilder getMicroBatchBuilder(String streaming) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/examples/test_case_data/localmeta/kafka/kafka_test.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/kafka_test.json b/examples/test_case_data/localmeta/kafka/kafka_test.json
new file mode 100644
index 0000000..5445417
--- /dev/null
+++ b/examples/test_case_data/localmeta/kafka/kafka_test.json
@@ -0,0 +1,20 @@
+{
+  "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec6c3c0d",
+  "name": "kafka_test",
+  "topic": "kafka_stream_test",
+  "timeout": 60000,
+  "maxReadCount": 1000,
+  "bufferSize": 65536,
+  "last_modified": 0,
+  "clusters": [
+    {
+      "brokers": [
+        {
+          "id": 0,
+          "host": "sandbox.hortonworks.com",
+          "port": 6667
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json b/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json
new file mode 100644
index 0000000..5fae898
--- /dev/null
+++ b/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json
@@ -0,0 +1,22 @@
+{
+  "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
+  "name": "test_streaming_table_cube",
+  "topic": "test_streaming_table_topic_xyz",
+  "timeout": 60000,
+  "maxReadCount": 1000,
+  "bufferSize": 65536,
+  "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser",
+  "partition": 1,
+  "last_modified": 0,
+  "clusters": [
+    {
+      "brokers": [
+        {
+          "id": 0,
+          "host": "sandbox",
+          "port": 6667
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json b/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json
new file mode 100644
index 0000000..9e36201
--- /dev/null
+++ b/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json
@@ -0,0 +1,22 @@
+{
+  "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec909322",
+  "name": "test_streaming_table_ii",
+  "topic": "test_streaming_table_topic_xyz",
+  "timeout": 60000,
+  "maxReadCount": 1000,
+  "bufferSize": 65536,
+  "parserName": "org.apache.kylin.source.kafka.JsonStreamParser",
+  "partition": 1,
+  "last_modified": 0,
+  "clusters": [
+    {
+      "brokers": [
+        {
+          "id": 0,
+          "host": "sandbox",
+          "port": 6667
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json b/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
index 98b4218..b358183 100644
--- a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
+++ b/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
@@ -1,23 +1,7 @@
 {
   "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
   "name": "test_streaming_table_cube",
-  "topic": "test_streaming_table_topic_xyz",
-  "timeout": 60000,
-  "maxReadCount": 1000,
-  "bufferSize": 65536,
   "cubeName": "test_streaming_table_cube",
-  "parserName": "org.apache.kylin.streaming.TimedJsonStreamParser",
   "partition": 1,
-  "last_modified": 0,
-  "clusters": [
-    {
-      "brokers": [
-        {
-          "id": 0,
-          "host": "sandbox",
-          "port": 6667
-        }
-      ]
-    }
-  ]
+  "last_modified": 0
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json b/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
index e70153e..da1a765 100644
--- a/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
+++ b/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
@@ -1,23 +1,7 @@
 {
   "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec909322",
   "name": "test_streaming_table_ii",
-  "topic": "test_streaming_table_topic_xyz",
-  "timeout": 60000,
-  "maxReadCount": 1000,
-  "bufferSize": 65536,
   "iiName": "test_streaming_table_ii",
-  "parserName": "org.apache.kylin.streaming.JsonStreamParser",
   "partition": 1,
-  "last_modified": 0,
-  "clusters": [
-    {
-      "brokers": [
-        {
-          "id": 0,
-          "host": "sandbox",
-          "port": 6667
-        }
-      ]
-    }
-  ]
+  "last_modified": 0
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
new file mode 100644
index 0000000..ba337c8
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/streaming/SliceBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.invertedindex.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.engine.streaming.StreamingBatch;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.invertedindex.index.BatchSliceMaker;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ */
+public final class SliceBuilder {
+
+    private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
+
+    private final BatchSliceMaker sliceMaker;
+    private final IIDesc iiDesc;
+    private final boolean useLocalDict;
+
+    public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
+        this.iiDesc = desc;
+        this.sliceMaker = new BatchSliceMaker(desc, shard);
+        this.useLocalDict = useLocalDict;
+    }
+
+    public Slice buildSlice(StreamingBatch microStreamBatch) {
+        final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
+            @Nullable
+            @Override
+            public List<String> apply(@Nullable StreamingMessage input) {
+                return input.getData();
+            }
+        });
+        final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+        TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
+        return build(messages, tableRecordInfo, dictionaries);
+    }
+
+    private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
+        final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
+            @Nullable
+            @Override
+            public TableRecord apply(@Nullable List<String> input) {
+                TableRecord result = tableRecordInfo.createTableRecord();
+                for (int i = 0; i < input.size(); i++) {
+                    result.setValueString(i, input.get(i));
+                }
+                return result;
+            }
+        }));
+        slice.setLocalDictionaries(localDictionary);
+        return slice;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
new file mode 100644
index 0000000..5883493
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ */
+class ByteBufferBackedInputStream extends InputStream {
+
+    private ByteBuffer buf;
+
+    public ByteBufferBackedInputStream(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+        return buf.get() & 0xFF;
+    }
+
+    @Override
+    public int read(byte[] bytes, int off, int len)
+            throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buf.remaining());
+        buf.get(bytes, off, len);
+        return len;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index c84035f..be07290 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -34,18 +34,10 @@
 
 package org.apache.kylin.source.kafka;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -53,12 +45,9 @@ import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  */
@@ -104,15 +93,7 @@ public class KafkaConfigManager {
     }
 
     private String formatStreamingConfigPath(String name) {
-        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
-    }
-
-    private String formatStreamingOutputPath(String streaming, int partition) {
-        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
-    }
-
-    private String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
-        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
+        return ResourceStore.KAfKA_RESOURCE_ROOT + "/" + name + ".json";
     }
 
     public boolean createOrUpdateKafkaConfig(String name, KafkaConfig config) {
@@ -125,7 +106,7 @@ public class KafkaConfigManager {
         }
     }
 
-    public KafkaConfig getStreamingConfig(String name) {
+    public KafkaConfig getKafkaConfig(String name) {
         try {
             return getStore().getResource(formatStreamingConfigPath(name), KafkaConfig.class, KafkaConfig.SERIALIZER);
         } catch (IOException e) {
@@ -143,63 +124,6 @@ public class KafkaConfigManager {
         getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER);
     }
 
-    public long getOffset(String streaming, int shard) {
-        final String resPath = formatStreamingOutputPath(streaming, shard);
-        try {
-            final InputStream inputStream = getStore().getResource(resPath);
-            if (inputStream == null) {
-                return 0;
-            } else {
-                final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
-                return Long.parseLong(br.readLine());
-            }
-        } catch (Exception e) {
-            logger.error("error get offset, path:" + resPath, e);
-            throw new RuntimeException("error get offset, path:" + resPath, e);
-        }
-    }
-
-    public void updateOffset(String streaming, int shard, long offset) {
-        Preconditions.checkArgument(offset >= 0, "offset cannot be smaller than 0");
-        final String resPath = formatStreamingOutputPath(streaming, shard);
-        try {
-            getStore().putResource(resPath, new ByteArrayInputStream(Long.valueOf(offset).toString().getBytes()), getStore().getResourceTimestamp(resPath));
-        } catch (IOException e) {
-            logger.error("error update offset, path:" + resPath, e);
-            throw new RuntimeException("error update offset, path:" + resPath, e);
-        }
-    }
-
-    public Map<Integer, Long> getOffset(String streaming, List<Integer> partitions) {
-        Collections.sort(partitions);
-        final String resPath = formatStreamingOutputPath(streaming, partitions);
-        try {
-            final InputStream inputStream = getStore().getResource(resPath);
-            if (inputStream == null) {
-                return Collections.emptyMap();
-            }
-            final HashMap<Integer, Long> result = mapper.readValue(inputStream, mapType);
-            return result;
-        } catch (IOException e) {
-            logger.error("error get offset, path:" + resPath, e);
-            throw new RuntimeException("error get offset, path:" + resPath, e);
-        }
-    }
-
-    public void updateOffset(String streaming, HashMap<Integer, Long> offset) {
-        List<Integer> partitions = Lists.newLinkedList(offset.keySet());
-        Collections.sort(partitions);
-        final String resPath = formatStreamingOutputPath(streaming, partitions);
-        try {
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            mapper.writeValue(baos, offset);
-            getStore().putResource(resPath, new ByteArrayInputStream(baos.toByteArray()), getStore().getResourceTimestamp(resPath));
-        } catch (IOException e) {
-            logger.error("error update offset, path:" + resPath, e);
-            throw new RuntimeException("error update offset, path:" + resPath, e);
-        }
-    }
-
     private final ObjectMapper mapper = new ObjectMapper();
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 393b8e7..9951f86 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -79,9 +79,10 @@ public class KafkaStreamingInput implements IStreamingInput {
     @Override
     public StreamingBatch getBatchWithTimeWindow(String streaming, int id, long startTime, long endTime) {
         try {
+            logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime));
             final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
             final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
-            final KafkaConfig kafkaConfig = kafkaConfigManager.getStreamingConfig(streaming);
+            final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
             final StreamingParser streamingParser = getStreamingParser(kafkaConfig);
             final ExecutorService executorService = Executors.newCachedThreadPool();
             final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();
@@ -106,6 +107,7 @@ public class KafkaStreamingInput implements IStreamingInput {
                 }
             }
             final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime);
+            logger.info("finish to get streaming batch, total message count:" + messages.size());
             return new StreamingBatch(messages, timeRange);
         } catch (ReflectiveOperationException e) {
             throw new RuntimeException("failed to create instance of StreamingParser", e);
@@ -220,8 +222,8 @@ public class KafkaStreamingInput implements IStreamingInput {
         });
         if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
             Class clazz = Class.forName(kafkaConfig.getParserName());
-            Constructor constructor = clazz.getConstructor(List.class);
-            return (StreamingParser) constructor.newInstance(columns);
+            Constructor constructor = clazz.getConstructor(List.class, String.class);
+            return (StreamingParser) constructor.newInstance(columns, kafkaConfig.getParserProperties());
         } else {
             throw new IllegalStateException("invalid StreamingConfig:" + kafkaConfig.getName() + " missing property StreamingParser");
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22ef792c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
new file mode 100644
index 0000000..307f73a
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.source.kafka;
+
+import com.google.common.collect.Lists;
+import kafka.message.MessageAndOffset;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+/**
+ */
+public final class StringStreamingParser implements StreamingParser {
+
+    public static final StringStreamingParser instance = new StringStreamingParser();
+
+    private StringStreamingParser() {
+    }
+
+    @Override
+    public StreamingMessage parse(MessageAndOffset kafkaMessage) {
+        final ByteBuffer payload = kafkaMessage.message().payload();
+        byte[] bytes = new byte[payload.limit()];
+        payload.get(bytes);
+        return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object>emptyMap());
+    }
+
+    @Override
+    public boolean filter(StreamingMessage streamingMessage) {
+        return true;
+    }
+}