You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/02 10:40:11 UTC

[1/5] kylin git commit: KYLIN-1311 fix small bug

Repository: kylin
Updated Branches:
  refs/heads/helix-201601 2c66114ee -> 5410e62dd


KYLIN-1311 fix small bug

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f3987462
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f3987462
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f3987462

Branch: refs/heads/helix-201601
Commit: f3987462e6685f2ed7374cf5dc058b8d22fc835f
Parents: 2c66114
Author: shaofengshi <sh...@apache.org>
Authored: Fri Jan 15 17:57:26 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Jan 15 17:57:26 2016 +0800

----------------------------------------------------------------------
 .../engine/streaming/StreamingManager.java      | 11 +++++-----
 .../rest/controller/StreamingController.java    | 13 ++++++------
 .../helix/LeaderStandbyStateModelFactory.java   | 21 +++++++++++++++++---
 3 files changed, 30 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f3987462/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index af04a11..798fc3f 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -137,6 +137,12 @@ public class StreamingManager {
         return streamingMap.get(name);
     }
 
+    public StreamingConfig getStreamingConfigByCube(String cubeName) {
+        String streamingConfig = cubeName + "_streaming";
+        return getStreamingConfig(streamingConfig);
+    }
+
+
     public List<StreamingConfig> listAllStreaming() {
         return new ArrayList<>(streamingMap.values());
     }
@@ -168,11 +174,6 @@ public class StreamingManager {
         streamingMap.remove(streamingConfig.getName());
     }
 
-    public StreamingConfig getConfig(String name) {
-        name = name.toUpperCase();
-        return streamingMap.get(name);
-    }
-
     public void removeStreamingLocal(String streamingName) {
         streamingMap.removeLocal(streamingName);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f3987462/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 57831d5..fb806d1 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -236,13 +236,11 @@ public class StreamingController extends BasicController {
      * @return
      * @throws IOException
      */
-    @RequestMapping(value = "/{streamingName}/build", method = {RequestMethod.PUT})
+    @RequestMapping(value = "/{cubeName}/build", method = {RequestMethod.PUT})
     @ResponseBody
-    public StreamingBuildRequest buildStream(@PathVariable String streamingName, @RequestBody StreamingBuildRequest streamingBuildRequest) {
-        streamingBuildRequest.setStreaming(streamingName);
-        StreamingConfig streamingConfig = streamingService.getStreamingManager().getConfig(streamingName);
-        Preconditions.checkNotNull(streamingConfig, "Stream config '" + streamingName + "' is not found.");
-        String cubeName = streamingConfig.getCubeName();
+    public StreamingBuildRequest buildStream(@PathVariable String cubeName, @RequestBody StreamingBuildRequest streamingBuildRequest) {
+        StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
+        Preconditions.checkNotNull(streamingConfig, "Stream config for '" + cubeName + "' is not found.");
         List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
         Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
         CubeInstance cube = cubes.get(0);
@@ -257,7 +255,8 @@ public class StreamingController extends BasicController {
             }
         }
 
-        streamingService.buildStream(streamingName, streamingBuildRequest);
+        streamingBuildRequest.setStreaming(streamingConfig.getName());
+        streamingService.buildStream(cubeName, streamingBuildRequest);
         streamingBuildRequest.setMessage("Build request is submitted successfully.");
         streamingBuildRequest.setSuccessful(true);
         return streamingBuildRequest;

http://git-wip-us.apache.org/repos/asf/kylin/blob/f3987462/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
index df23ea0..8614e8c 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -10,6 +10,10 @@ import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.lock.MockJobLock;
@@ -48,7 +52,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
         public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
             logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
             try {
-                KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+                final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
                 DefaultScheduler scheduler = DefaultScheduler.createInstance();
                 scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
                 while (!scheduler.hasStarted()) {
@@ -89,11 +93,22 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
         public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
             String resourceName = message.getResourceId().stringify();
             Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
-            long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_")) + 1);
+            long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1));
             String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
-            long start = Long.parseLong(temp.substring(temp.lastIndexOf("_")) + 1);
+            long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1));
             String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
 
+            final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            
+            final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingConfig).getCubeName();
+            final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+            for (CubeSegment segment : cube.getSegments()) {
+                if (segment.getDateRangeStart() <= start && segment.getDateRangeEnd() >= end) {
+                    logger.info("Segment " + segment.getName() + " already exist, no need rebuild.");
+                    return;
+                }
+            }
+            
             KylinConfigBase.getKylinHome();
             String segmentId = start + "_" + end;
             String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingConfig + " " + segmentId + " -oneoff true -start " + start + " -end " + end + " -streaming " + streamingConfig;


[2/5] kylin git commit: KYLIN-1311 on the way

Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java b/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java
new file mode 100644
index 0000000..3182c16
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+
+import java.io.File;
+
+/**
+ */
+public class TestBaseWithZookeeper extends LocalFileMetadataTestCase {
+    protected static final String zkAddress = "localhost:2199";
+    static ZkServer server;
+    static boolean zkStarted = false;
+
+    @BeforeClass
+    public static void setupResource() throws Exception {
+        staticCreateTestMetadata();
+
+        if (zkStarted == false) {
+            final File tmpDir = File.createTempFile("KylinTest", null);
+            FileUtil.fullyDelete(tmpDir);
+            tmpDir.mkdirs();
+            tmpDir.deleteOnExit();
+            server = new ZkServer(tmpDir.getAbsolutePath() + "/dataDir", tmpDir.getAbsolutePath() + "/logDir", new IDefaultNameSpace() {
+                @Override
+                public void createDefaultNameSpace(ZkClient zkClient) {
+                }
+            }, 2199, 1000, 2000);
+
+            server.start();
+            zkStarted = true;
+            System.setProperty("kylin.zookeeper.address", zkAddress);
+        }
+
+    }
+
+    @AfterClass
+    public static void tearDownResource() {
+        if (server == null) {
+            server.shutdown();
+            zkStarted = false;
+            System.setProperty("kylin.zookeeper.address", "");
+        }
+
+        staticCleanupTestMetadata();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 0907623..b075387 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -40,7 +40,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
+import com.google.common.collect.Maps;
 import kafka.message.MessageAndOffset;
 
 import org.apache.commons.lang3.StringUtils;
@@ -102,7 +105,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
     @Override
     public StreamingMessage parse(MessageAndOffset messageAndOffset) {
         try {
-            Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            ConcurrentMap<String, String> root = new ConcurrentSkipListMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+            root.putAll(message);
             String tsStr = root.get(tsColName);
             //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + //
             //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/webapp/app/js/model/streamingModel.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/streamingModel.js b/webapp/app/js/model/streamingModel.js
index 31ebb8f..61560d7 100644
--- a/webapp/app/js/model/streamingModel.js
+++ b/webapp/app/js/model/streamingModel.js
@@ -35,7 +35,7 @@ KylinApp.service('StreamingModel', function () {
       "topic": "",
       "timeout": "60000",
       "bufferSize": "65536",
-      "parserName": "org.apache.kylin.streaming.TimedJsonStreamParser",
+      "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser",
       "margin": "300000",
       "clusters":[],
       "parserProperties":""


[3/5] kylin git commit: KYLIN-1311 on the way

Posted by sh...@apache.org.
KYLIN-1311 on the way

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/67e92021
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/67e92021
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/67e92021

Branch: refs/heads/helix-201601
Commit: 67e92021e55bcab8b76035a4d1b3205ad64bff79
Parents: f398746
Author: shaofengshi <sh...@apache.org>
Authored: Fri Jan 22 11:01:48 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Fri Jan 22 11:02:07 2016 +0800

----------------------------------------------------------------------
 build/bin/kylin.sh                              |   8 +-
 .../test_case_data/localmeta/kylin.properties   |   2 +-
 server/pom.xml                                  |   1 +
 .../rest/controller/ClusterController.java      |  71 ++
 .../kylin/rest/controller/JobController.java    |  33 -
 .../rest/controller/StreamingController.java    |  68 +-
 .../kylin/rest/helix/HelixClusterAdmin.java     |  31 +-
 .../rest/helix/JobEngineTransitionHandler.java  |  70 ++
 .../helix/LeaderStandbyStateModelFactory.java   | 125 +---
 .../helix/StreamCubeBuildTransitionHandler.java | 107 +++
 .../rest/request/StreamingBuildRequest.java     |  29 +-
 .../security/KylinAuthenticationProvider.java   |   3 +-
 .../kylin/rest/service/StreamingService.java    |  34 +-
 .../rest/controller/UserControllerTest.java     |   9 -
 .../kylin/rest/helix/HelixClusterAdminTest.java |  22 +-
 .../kylin/rest/service/CacheServiceTest.java    | 720 +++++++++----------
 .../kylin/rest/service/ServiceTestBase.java     |  40 +-
 .../rest/service/TestBaseWithZookeeper.java     |  74 ++
 .../source/kafka/TimedJsonStreamParser.java     |   7 +-
 webapp/app/js/model/streamingModel.js           |   2 +-
 20 files changed, 826 insertions(+), 630 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index b152b67..8f39443 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -70,7 +70,7 @@ then
 
     if [ -z "$KYLIN_REST_ADDRESS" ]
     then
-        kylin_rest_address=`hostname`":"`grep "<Connector port=" ${tomcat_root}/conf/server.xml |grep protocol=\"HTTP/1.1\" | cut -d '=' -f 2 | cut -d \" -f 2`
+        kylin_rest_address=`hostname -f`":"`grep "<Connector port=" ${tomcat_root}/conf/server.xml |grep protocol=\"HTTP/1.1\" | cut -d '=' -f 2 | cut -d \" -f 2`
         echo "KYLIN_REST_ADDRESS not found, will use ${kylin_rest_address}"
     else
         echo "KYLIN_REST_ADDRESS is set to: $KYLIN_REST_ADDRESS"
@@ -154,12 +154,12 @@ then
         exit 0
     elif [ $2 == "stop" ]
     then
-        if [ ! -f "${KYLIN_HOME}/$3_$4" ]
+        if [ ! -f "${KYLIN_HOME}/logs/$3_$4" ]
             then
                 echo "streaming is not running, please check"
                 exit 1
             fi
-            pid=`cat ${KYLIN_HOME}/$3_$4`
+            pid=`cat ${KYLIN_HOME}/logs/$3_$4`
             if [ "$pid" = "" ]
             then
                 echo "streaming is not running, please check"
@@ -168,7 +168,7 @@ then
                 echo "stopping streaming:$pid"
                 kill $pid
             fi
-            rm ${KYLIN_HOME}/$3_$4
+            rm ${KYLIN_HOME}/logs/$3_$4
             exit 0
         else
             echo

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 978102f..41a9895 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -6,7 +6,7 @@
 kylin.owner=whoami@kylin.apache.org
 
 # List of web servers in use, this enables one web server instance to sync up with other servers.
-#kylin.rest.servers=localhost:7070
+kylin.rest.servers=localhost:7070
 
 # The metadata store in hbase
 kylin.metadata.url=

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index e945af3..a103d89 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -464,6 +464,7 @@
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
             <version>${zookeeper.version}</version>
+            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
new file mode 100644
index 0000000..97fff36
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.controller;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.constant.JobTimeFilterEnum;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
+import org.apache.kylin.rest.request.JobListRequest;
+import org.apache.kylin.rest.service.JobService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+import java.util.*;
+
+/**
+ * 
+ */
+@Controller
+@RequestMapping(value = "cluster")
+public class ClusterController extends BasicController implements InitializingBean {
+    private static final Logger logger = LoggerFactory.getLogger(ClusterController.class);
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
+     */
+    @Override
+    public void afterPropertiesSet() throws Exception {
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+        final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig);
+        clusterAdmin.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                clusterAdmin.stop();
+            }
+        }));
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 77d987f..a61635d 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -61,42 +61,9 @@ public class JobController extends BasicController implements InitializingBean {
      */
     @Override
     public void afterPropertiesSet() throws Exception {
-
         String timeZone = jobService.getConfig().getTimeZone();
         TimeZone tzone = TimeZone.getTimeZone(timeZone);
         TimeZone.setDefault(tzone);
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-
-        if (kylinConfig.isClusterEnabled() == true) {
-            logger.info("Kylin cluster enabled, will use Helix/zookeeper to coordinate.");
-            final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig);
-            clusterAdmin.start();
-
-            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    clusterAdmin.stop();
-                }
-            }));
-        } else {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        DefaultScheduler scheduler = DefaultScheduler.createInstance();
-                        scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
-                        if (!scheduler.hasStarted()) {
-                            logger.error("scheduler has not been started");
-                            System.exit(1);
-                        }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-            }).start();
-        }
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index fb806d1..209c552 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -26,11 +26,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeBuildTypeEnum;
-import org.apache.kylin.engine.streaming.BootstrapConfig;
 import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.exception.ForbiddenException;
 import org.apache.kylin.rest.exception.InternalErrorException;
@@ -45,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.access.AccessDeniedException;
-import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.*;
 
@@ -93,7 +88,6 @@ public class StreamingController extends BasicController {
         }
     }
 
-
     /**
      *
      * create Streaming Schema
@@ -105,7 +99,7 @@ public class StreamingController extends BasicController {
         //Update Model
         StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
         KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
-        if (streamingConfig == null ||kafkaConfig == null) {
+        if (streamingConfig == null || kafkaConfig == null) {
             return streamingRequest;
         }
         if (StringUtils.isEmpty(streamingConfig.getName())) {
@@ -124,7 +118,7 @@ public class StreamingController extends BasicController {
         try {
             kafkaConfig.setUuid(UUID.randomUUID().toString());
             kafkaConfigService.createKafkaConfig(kafkaConfig);
-        }catch (IOException e){
+        } catch (IOException e) {
             try {
                 streamingService.dropStreamingConfig(streamingConfig);
             } catch (IOException e1) {
@@ -139,7 +133,7 @@ public class StreamingController extends BasicController {
 
     @RequestMapping(value = "", method = { RequestMethod.PUT })
     @ResponseBody
-        public StreamingRequest updateModelDesc(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
+    public StreamingRequest updateModelDesc(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
         StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
         KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
 
@@ -156,7 +150,7 @@ public class StreamingController extends BasicController {
         }
         try {
             kafkaConfig = kafkaConfigService.updateKafkaConfig(kafkaConfig);
-        }catch (AccessDeniedException accessDeniedException) {
+        } catch (AccessDeniedException accessDeniedException) {
             throw new ForbiddenException("You don't have right to update this KafkaConfig.");
         } catch (Exception e) {
             logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
@@ -203,7 +197,6 @@ public class StreamingController extends BasicController {
         return desc;
     }
 
-
     private KafkaConfig deserializeKafkaSchemalDesc(StreamingRequest streamingRequest) {
         KafkaConfig desc = null;
         try {
@@ -227,16 +220,14 @@ public class StreamingController extends BasicController {
         request.setMessage(message);
     }
 
-
-
     /**
      * Send a stream build request
      *
-     * @param cubeName Cube ID
+     * @param cubeName Cube Name
      * @return
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/build", method = {RequestMethod.PUT})
+    @RequestMapping(value = "/{cubeName}/build", method = { RequestMethod.PUT })
     @ResponseBody
     public StreamingBuildRequest buildStream(@PathVariable String cubeName, @RequestBody StreamingBuildRequest streamingBuildRequest) {
         StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
@@ -244,27 +235,54 @@ public class StreamingController extends BasicController {
         List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
         Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
         CubeInstance cube = cubes.get(0);
-        if (streamingBuildRequest.isFillGap() == false) {
-            Preconditions.checkArgument(streamingBuildRequest.getEnd() > streamingBuildRequest.getStart(), "End time should be greater than start time.");
-            for (CubeSegment segment : cube.getSegments()) {
-                if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
-                    streamingBuildRequest.setMessage("The segment already exists: " + segment.toString());
-                    streamingBuildRequest.setSuccessful(false);
-                    return streamingBuildRequest;
-                }
+        if (streamingBuildRequest.getEnd() <= streamingBuildRequest.getStart()) {
+            streamingBuildRequest.setMessage("End time should be greater than start time.");streamingBuildRequest.setSuccessful(false);
+            return streamingBuildRequest;
+        }
+
+        for (CubeSegment segment : cube.getSegments()) {
+            if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
+                streamingBuildRequest.setMessage("The segment already exists: " + segment.toString());
+                streamingBuildRequest.setSuccessful(false);
+                return streamingBuildRequest;
             }
         }
 
         streamingBuildRequest.setStreaming(streamingConfig.getName());
-        streamingService.buildStream(cubeName, streamingBuildRequest);
+        streamingService.buildStream(cube, streamingBuildRequest);
         streamingBuildRequest.setMessage("Build request is submitted successfully.");
         streamingBuildRequest.setSuccessful(true);
         return streamingBuildRequest;
 
     }
 
+    /**
+     * Send a stream fillGap request
+     *
+     * @param cubeName Cube Name
+     * @return
+     * @throws IOException
+     */
+    @RequestMapping(value = "/{cubeName}/fillgap", method = { RequestMethod.PUT })
+    @ResponseBody
+    public StreamingBuildRequest fillGap(@PathVariable String cubeName) {
+        StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
+        Preconditions.checkNotNull(streamingConfig, "Stream config for '" + cubeName + "' is not found.");
+        List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
+        Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
+        CubeInstance cube = cubes.get(0);
+
+        StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest();
+        streamingBuildRequest.setStreaming(streamingConfig.getName());
+        streamingService.fillGap(cube);
+        streamingBuildRequest.setMessage("FillGap request is submitted successfully.");
+        streamingBuildRequest.setSuccessful(true);
+        return streamingBuildRequest;
+
+    }
+
     public void setStreamingService(StreamingService streamingService) {
-        this.streamingService= streamingService;
+        this.streamingService = streamingService;
     }
 
     public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
index 9850e24..0758ef1 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -33,8 +33,10 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.request.StreamingBuildRequest;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -126,14 +128,13 @@ public class HelixClusterAdmin {
 
     }
 
-    public void addStreamingJob(String streamingName, long start, long end) {
-        String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end;
-        if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) {
-            admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
-        } else {
-            logger.warn("Resource '" + resourceName + "' already exists in cluster, skip adding.");
+    public void addStreamingJob(StreamingBuildRequest streamingBuildRequest) {
+        String resourceName = streamingBuildRequest.toResourceName();
+        if (admin.getResourcesInCluster(clusterName).contains(resourceName)) {
+            logger.warn("Resource '" + resourceName + "' already exists in cluster, remove and re-add.");
+            admin.dropResource(clusterName, resourceName);
         }
-
+        admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
         admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER);
 
     }
@@ -150,7 +151,7 @@ public class HelixClusterAdmin {
      */
     protected void startInstance(String instanceName) throws Exception {
         participantManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddress);
-        participantManager.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(MODEL_LEADER_STANDBY), new LeaderStandbyStateModelFactory());
+        participantManager.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(MODEL_LEADER_STANDBY), new LeaderStandbyStateModelFactory(this.kylinConfig));
         participantManager.connect();
         participantManager.addLiveInstanceChangeListener(new KylinClusterLiveInstanceChangeListener());
 
@@ -179,10 +180,12 @@ public class HelixClusterAdmin {
     public void stop() {
         if (participantManager != null) {
             participantManager.disconnect();
+            participantManager = null;
         }
 
         if (controllerManager != null) {
             controllerManager.disconnect();
+            controllerManager = null;
         }
     }
 
@@ -269,11 +272,13 @@ public class HelixClusterAdmin {
                 int indexOfUnderscore = instanceName.lastIndexOf("_");
                 instanceRestAddresses.add(instanceName.substring(0, indexOfUnderscore) + ":" + instanceName.substring(indexOfUnderscore + 1));
             }
-            String restServersInCluster = StringUtil.join(instanceRestAddresses, ",");
-            kylinConfig.setProperty("kylin.rest.servers", restServersInCluster);
-            System.setProperty("kylin.rest.servers", restServersInCluster);
-            logger.info("kylin.rest.servers update to " + restServersInCluster);
-            Broadcaster.clearCache();
+            if (instanceRestAddresses.size() > 0) {
+                String restServersInCluster = StringUtil.join(instanceRestAddresses, ",");
+                kylinConfig.setProperty("kylin.rest.servers", restServersInCluster);
+                System.setProperty("kylin.rest.servers", restServersInCluster);
+                logger.info("kylin.rest.servers update to " + restServersInCluster);
+                Broadcaster.clearCache();
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java b/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java
new file mode 100644
index 0000000..3ef04ee
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.rest.helix;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.lock.MockJobLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ */
+public class JobEngineTransitionHandler extends TransitionHandler {
+    private static final Logger logger = LoggerFactory.getLogger(JobEngineTransitionHandler.class);
+    private final KylinConfig kylinConfig;
+
+    private static ConcurrentMap<KylinConfig, JobEngineTransitionHandler> instanceMaps = Maps.newConcurrentMap();
+
+    private JobEngineTransitionHandler(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+    }
+
+    public static JobEngineTransitionHandler getInstance(KylinConfig kylinConfig) {
+        Preconditions.checkNotNull(kylinConfig);
+        instanceMaps.putIfAbsent(kylinConfig, new JobEngineTransitionHandler(kylinConfig));
+        return instanceMaps.get(kylinConfig);
+    }
+
+    @Transition(to = "LEADER", from = "STANDBY")
+    public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+        logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
+        try {
+            DefaultScheduler scheduler = DefaultScheduler.createInstance();
+            scheduler.init(new JobEngineConfig(this.kylinConfig), new MockJobLock());
+            while (!scheduler.hasStarted()) {
+                logger.error("scheduler has not been started");
+                Thread.sleep(1000);
+            }
+        } catch (Exception e) {
+            logger.error("error start DefaultScheduler", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Transition(to = "STANDBY", from = "LEADER")
+    public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+        logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()");
+        DefaultScheduler.destroyInstance();
+
+    }
+
+    @Transition(to = "STANDBY", from = "OFFLINE")
+    public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+        logger.info("JobEngineStateModel.onBecomeStandbyFromOffline()");
+
+    }
+
+    @Transition(to = "OFFLINE", from = "STANDBY")
+    public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+        logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()");
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
index 8614e8c..940c9c2 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -1,146 +1,35 @@
 package org.apache.kylin.rest.helix;
 
-import com.google.common.base.Preconditions;
-import org.apache.helix.NotificationContext;
 import org.apache.helix.api.StateTransitionHandlerFactory;
 import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.Transition;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.KylinConfigBase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.lock.MockJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
 import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX;
 
 /**
  */
 public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> {
-    private static final Logger logger = LoggerFactory.getLogger(LeaderStandbyStateModelFactory.class);
+    private final KylinConfig kylinConfig;
+
+    public LeaderStandbyStateModelFactory(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+    }
 
     @Override
     public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
         if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) {
-            return JobEngineStateModel.INSTANCE;
+            return JobEngineTransitionHandler.getInstance(kylinConfig);
         }
 
         if (partitionId.getResourceId().stringify().startsWith(RESOURCE_STREAME_CUBE_PREFIX)) {
-            return StreamCubeStateModel.INSTANCE;
+            return StreamCubeBuildTransitionHandler.getInstance(kylinConfig);
         }
 
         return null;
     }
 
-    public static class JobEngineStateModel extends TransitionHandler {
-
-        public static JobEngineStateModel INSTANCE = new JobEngineStateModel();
-
-        @Transition(to = "LEADER", from = "STANDBY")
-        public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
-            logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
-            try {
-                final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-                DefaultScheduler scheduler = DefaultScheduler.createInstance();
-                scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
-                while (!scheduler.hasStarted()) {
-                    logger.error("scheduler has not been started");
-                    Thread.sleep(1000);
-                }
-            } catch (Exception e) {
-                logger.error("error start DefaultScheduler", e);
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Transition(to = "STANDBY", from = "LEADER")
-        public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
-            logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()");
-            DefaultScheduler.destroyInstance();
-
-        }
-
-        @Transition(to = "STANDBY", from = "OFFLINE")
-        public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
-            logger.info("JobEngineStateModel.onBecomeStandbyFromOffline()");
-
-        }
-
-        @Transition(to = "OFFLINE", from = "STANDBY")
-        public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
-            logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()");
-
-        }
-    }
-
-    public static class StreamCubeStateModel extends TransitionHandler {
-
-        public static StreamCubeStateModel INSTANCE = new StreamCubeStateModel();
-
-        @Transition(to = "LEADER", from = "STANDBY")
-        public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
-            String resourceName = message.getResourceId().stringify();
-            Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
-            long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1));
-            String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
-            long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1));
-            String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
-
-            final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            
-            final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingConfig).getCubeName();
-            final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
-            for (CubeSegment segment : cube.getSegments()) {
-                if (segment.getDateRangeStart() <= start && segment.getDateRangeEnd() >= end) {
-                    logger.info("Segment " + segment.getName() + " already exist, no need rebuild.");
-                    return;
-                }
-            }
-            
-            KylinConfigBase.getKylinHome();
-            String segmentId = start + "_" + end;
-            String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingConfig + " " + segmentId + " -oneoff true -start " + start + " -end " + end + " -streaming " + streamingConfig;
-            logger.info("Executing: " + cmd);
-            try {
-                String line;
-                Process p = Runtime.getRuntime().exec(cmd);
-                BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
-                while ((line = input.readLine()) != null) {
-                    logger.info(line);
-                }
-                input.close();
-            } catch (IOException err) {
-                logger.error("Error happens during build streaming  '" + resourceName + "'", err);
-                throw new RuntimeException(err);
-            }
-
-        }
-
-        @Transition(to = "STANDBY", from = "LEADER")
-        public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
-
-        }
-
-        @Transition(to = "STANDBY", from = "OFFLINE")
-        public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
-
-        }
-
-        @Transition(to = "OFFLINE", from = "STANDBY")
-        public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
-
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
new file mode 100644
index 0000000..44d8302
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
@@ -0,0 +1,107 @@
+package org.apache.kylin.rest.helix;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.rest.request.StreamingBuildRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ */
+public class StreamCubeBuildTransitionHandler extends TransitionHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamCubeBuildTransitionHandler.class);
+
+    private static ConcurrentMap<KylinConfig, StreamCubeBuildTransitionHandler> instanceMaps = Maps.newConcurrentMap();
+    private final KylinConfig kylinConfig;
+
+    private StreamCubeBuildTransitionHandler(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+    }
+
+    public static StreamCubeBuildTransitionHandler getInstance(KylinConfig kylinConfig) {
+        Preconditions.checkNotNull(kylinConfig);
+        instanceMaps.putIfAbsent(kylinConfig, new StreamCubeBuildTransitionHandler(kylinConfig));
+        return instanceMaps.get(kylinConfig);
+    }
+
+    @Transition(to = "LEADER", from = "STANDBY")
+    public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+        String resourceName = message.getResourceId().stringify();
+        StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName);
+
+        final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming()).getCubeName();
+        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        for (CubeSegment segment : cube.getSegments()) {
+            if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
+                logger.info("Segment " + segment.getName() + " already exist, no need rebuild.");
+                return;
+            }
+        }
+
+        KylinConfigBase.getKylinHome();
+        String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd();
+        String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingBuildRequest.getStreaming() + " " + segmentId + " -oneoff true -start " + streamingBuildRequest.getStart() + " -end " + streamingBuildRequest.getEnd() + " -streaming " + streamingBuildRequest.getStreaming();
+        logger.info("Executing: " + cmd);
+        try {
+            String line;
+            Process p = Runtime.getRuntime().exec(cmd);
+            BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
+            while ((line = input.readLine()) != null) {
+                logger.info(line);
+            }
+            input.close();
+        } catch (IOException err) {
+            logger.error("Error happens during build streaming  '" + resourceName + "'", err);
+            throw new RuntimeException(err);
+        }
+
+    }
+
+    @Transition(to = "STANDBY", from = "LEADER")
+    public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+        String resourceName = message.getResourceId().stringify();
+        StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName);
+        KylinConfigBase.getKylinHome();
+        String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd();
+        String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming stop " + streamingBuildRequest.getStreaming() + " " + segmentId;
+        logger.info("Executing: " + cmd);
+        try {
+            String line;
+            Process p = Runtime.getRuntime().exec(cmd);
+            BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
+            while ((line = input.readLine()) != null) {
+                logger.info(line);
+            }
+            input.close();
+        } catch (IOException err) {
+            logger.error("Error happens during build streaming  '" + resourceName + "'", err);
+            throw new RuntimeException(err);
+        }
+    }
+
+    @Transition(to = "STANDBY", from = "OFFLINE")
+    public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+
+    }
+
+    @Transition(to = "OFFLINE", from = "STANDBY")
+    public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
index e06a06c..dcf91fd 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
@@ -18,15 +18,28 @@
 
 package org.apache.kylin.rest.request;
 
+import com.google.common.base.Preconditions;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
+
+import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX;
+
 public class StreamingBuildRequest {
 
     private String streaming;
     private long start;
     private long end;
-    private boolean fillGap;
     private String message;
     private boolean successful;
 
+    public StreamingBuildRequest() {
+    }
+
+    public StreamingBuildRequest(String streaming, long start, long end) {
+        this.streaming = streaming;
+        this.start = start;
+        this.end = end;
+    }
+
     public String getStreaming() {
         return streaming;
     }
@@ -67,11 +80,17 @@ public class StreamingBuildRequest {
         this.end = end;
     }
 
-    public boolean isFillGap() {
-        return fillGap;
+    public String toResourceName() {
+        return HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX + streaming + "_" + start + "_" + end;
     }
 
-    public void setFillGap(boolean fillGap) {
-        this.fillGap = fillGap;
+    public static StreamingBuildRequest fromResourceName(String resourceName) {
+        Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
+        long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1));
+        String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
+        long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1));
+        String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
+
+        return new StreamingBuildRequest(streamingConfig, start, end);
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java b/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
index 1f147ef..b8dcd43 100644
--- a/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
+++ b/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
@@ -72,7 +72,8 @@ public class KylinAuthenticationProvider implements AuthenticationProvider {
             }
 
             logger.debug("Authenticated user " + authed.toString());
-            
+
+            SecurityContextHolder.getContext().setAuthentication(authed);
             UserDetails user;
             
             if (authed.getDetails() == null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index da20949..7c2cc48 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -21,7 +21,6 @@ package org.apache.kylin.rest.service;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.engine.streaming.BootstrapConfig;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
@@ -54,8 +53,8 @@ public class StreamingService extends BasicService {
         if (null == cubeInstance) {
             streamingConfigs = getStreamingManager().listAllStreaming();
         } else {
-            for(StreamingConfig config : getStreamingManager().listAllStreaming()){
-                if(cubeInstance.getName().equals(config.getCubeName())){
+            for (StreamingConfig config : getStreamingManager().listAllStreaming()) {
+                if (cubeInstance.getName().equals(config.getCubeName())) {
                     streamingConfigs.add(config);
                 }
             }
@@ -84,34 +83,35 @@ public class StreamingService extends BasicService {
         if (getStreamingManager().getStreamingConfig(config.getName()) != null) {
             throw new InternalErrorException("The streamingConfig named " + config.getName() + " already exists");
         }
-        StreamingConfig streamingConfig =  getStreamingManager().saveStreamingConfig(config);
+        StreamingConfig streamingConfig = getStreamingManager().saveStreamingConfig(config);
         return streamingConfig;
     }
 
-//    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+    //    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
     public StreamingConfig updateStreamingConfig(StreamingConfig config) throws IOException {
         return getStreamingManager().updateStreamingConfig(config);
     }
 
-//    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+    //    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
     public void dropStreamingConfig(StreamingConfig config) throws IOException {
         getStreamingManager().removeStreamingConfig(config);
     }
 
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
+    public void buildStream(CubeInstance cube, StreamingBuildRequest streamingBuildRequest) {
+        HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv());
+        clusterAdmin.addStreamingJob(streamingBuildRequest);
+    }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
-    public void buildStream(String cube, StreamingBuildRequest streamingBuildRequest) {
+    public void fillGap(CubeInstance cube) {
         HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv());
-        if (streamingBuildRequest.isFillGap()) {
-            final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingBuildRequest.getStreaming());
-            final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
-            logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ","));
-            for (Pair<Long, Long> gap : gaps) {
-                clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), gap.getFirst(), gap.getSecond());
-            }
-        } else {
-            clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), streamingBuildRequest.getStart(), streamingBuildRequest.getEnd());
+        final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfigByCube(cube.getName());
+        final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+        logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ","));
+        for (Pair<Long, Long> gap : gaps) {
+            StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest(streamingConfig.getName(), gap.getFirst(), gap.getSecond());
+            clusterAdmin.addStreamingJob(streamingBuildRequest);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
index ab77a9a..fe0e67a 100644
--- a/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
@@ -41,15 +41,6 @@ public class UserControllerTest extends ServiceTestBase {
 
     private UserController userController;
 
-    @BeforeClass
-    public static void setupResource() {
-        staticCreateTestMetadata();
-        List<GrantedAuthority> authorities = new ArrayList<GrantedAuthority>();
-        User user = new User("ADMIN", "ADMIN", authorities);
-        Authentication authentication = new TestingAuthenticationToken(user, "ADMIN", "ROLE_ADMIN");
-        SecurityContextHolder.getContext().setAuthentication(authentication);
-    }
-
     @Before
     public void setup() throws Exception {
         super.setup();

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
index 594e76b5..1c8b779 100644
--- a/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.rest.service.TestBaseWithZookeeper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,10 +40,7 @@ import static org.junit.Assert.assertTrue;
 
 /**
 */
-public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
-
-    String zkAddress = "localhost:2199";
-    ZkServer server;
+public class HelixClusterAdminTest extends TestBaseWithZookeeper {
 
     HelixClusterAdmin clusterAdmin1;
     HelixClusterAdmin clusterAdmin2;
@@ -52,21 +50,8 @@ public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
 
     @Before
     public void setup() throws Exception {
-        createTestMetadata();
-        // start zookeeper on localhost
-        final File tmpDir = File.createTempFile("HelixClusterAdminTest", null); 
-        FileUtil.fullyDelete(tmpDir);
-        tmpDir.mkdirs();
-        server = new ZkServer(tmpDir.getAbsolutePath() + "/dataDir", tmpDir.getAbsolutePath() + "/logDir", new IDefaultNameSpace() {
-            @Override
-            public void createDefaultNameSpace(ZkClient zkClient) {
-            }
-        }, 2199);
-        server.start();
-
         kylinConfig = this.getTestConfig();
         kylinConfig.setRestAddress("localhost:7070");
-        kylinConfig.setZookeeperAddress(zkAddress);
         kylinConfig.setClusterName(CLUSTER_NAME);
         
         final ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkAddress);
@@ -105,7 +90,7 @@ public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
         
         // 3. shutdown the first instance
         clusterAdmin1.stop();
-        clusterAdmin1 = null;
+//        clusterAdmin1 = null;
         Thread.sleep(1000);
         assertTrue(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
         assertEquals(1, kylinConfig.getRestServers().length);
@@ -133,7 +118,6 @@ public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
             clusterAdmin2.stop();
         }
         
-        server.shutdown();
         cleanupTestMetadata();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 763bebe..8193884 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -1,366 +1,354 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.rest.service;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.LookupDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.rest.broadcaster.BroadcasterReceiveServlet;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class CacheServiceTest extends LocalFileMetadataTestCase {
-
-    private static Server server;
-
-    private static String ZK_ADDRESS = "localhost:2199";
-    
-    private static KylinConfig configA;
-    private static KylinConfig configB;
-
-    private static final Logger logger = LoggerFactory.getLogger(CacheServiceTest.class);
-
-    private static AtomicLong counter = new AtomicLong();
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        staticCreateTestMetadata();
-        configA = KylinConfig.getInstanceFromEnv();
-        configA.setProperty("kylin.rest.servers", "localhost:7070");
-        configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
-        configB.setProperty("kylin.rest.servers", "localhost:7070");
-        configB.setMetadataUrl("../examples/test_metadata");
-
-        server = new Server(7070);
-        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
-        context.setContextPath("/");
-        server.setHandler(context);
-
-        final CacheService serviceA = new CacheService() {
-            @Override
-            public KylinConfig getConfig() {
-                return configA;
-            }
-        };
-        final CacheService serviceB = new CacheService() {
-            @Override
-            public KylinConfig getConfig() {
-                return configB;
-            }
-        };
-
-        final CubeService cubeServiceA = new CubeService() {
-            @Override
-            public KylinConfig getConfig() {
-                return configA;
-            }
-        };
-        final CubeService cubeServiceB = new CubeService() {
-            @Override
-            public KylinConfig getConfig() {
-                return configB;
-            }
-        };
-
-        serviceA.setCubeService(cubeServiceA);
-        serviceA.initCubeChangeListener();
-        serviceB.setCubeService(cubeServiceB);
-        serviceB.initCubeChangeListener();
-
-        context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
-            @Override
-            public void handle(String type, String name, String event) {
-
-                Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type);
-                Broadcaster.EVENT wipeEvent = Broadcaster.EVENT.getEvent(event);
-                final String log = "wipe cache type: " + wipeType + " event:" + wipeEvent + " name:" + name;
-                logger.info(log);
-                try {
-                    switch (wipeEvent) {
-                    case CREATE:
-                    case UPDATE:
-                        serviceA.rebuildCache(wipeType, name);
-                        serviceB.rebuildCache(wipeType, name);
-                        break;
-                    case DROP:
-                        serviceA.removeCache(wipeType, name);
-                        serviceB.removeCache(wipeType, name);
-                        break;
-                    default:
-                        throw new RuntimeException("invalid type:" + wipeEvent);
-                    }
-                } finally {
-                    counter.incrementAndGet();
-                }
-            }
-        })), "/");
-
-        server.start();
-    }
-
-    @AfterClass
-    public static void afterClass() throws Exception {
-        server.stop();
-        cleanAfterClass();
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        counter.set(0L);
-        createTestMetadata();
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-
-    private void waitForCounterAndClear(long count) {
-        int retryTimes = 0;
-        while ((!counter.compareAndSet(count, 0L))) {
-            if (++retryTimes > 30) {
-                throw new RuntimeException("timeout");
-            }
-            try {
-                Thread.sleep(100L);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    private static CubeManager getCubeManager(KylinConfig config) throws Exception {
-        return CubeManager.getInstance(config);
-    }
-
-    private static ProjectManager getProjectManager(KylinConfig config) throws Exception {
-        return ProjectManager.getInstance(config);
-    }
-
-    private static CubeDescManager getCubeDescManager(KylinConfig config) throws Exception {
-        return CubeDescManager.getInstance(config);
-    }
-
-    private static MetadataManager getMetadataManager(KylinConfig config) throws Exception {
-        return MetadataManager.getInstance(config);
-    }
-
-    @Test
-    public void testBasic() throws Exception {
-        assertTrue(!configA.equals(configB));
-
-        assertNotNull(getCubeManager(configA));
-        assertNotNull(getCubeManager(configB));
-        assertNotNull(getCubeDescManager(configA));
-        assertNotNull(getCubeDescManager(configB));
-        assertNotNull(getProjectManager(configB));
-        assertNotNull(getProjectManager(configB));
-        assertNotNull(getMetadataManager(configB));
-        assertNotNull(getMetadataManager(configB));
-
-        assertTrue(!getCubeManager(configA).equals(getCubeManager(configB)));
-        assertTrue(!getCubeDescManager(configA).equals(getCubeDescManager(configB)));
-        assertTrue(!getProjectManager(configA).equals(getProjectManager(configB)));
-        assertTrue(!getMetadataManager(configA).equals(getMetadataManager(configB)));
-
-        assertEquals(getProjectManager(configA).listAllProjects().size(), getProjectManager(configB).listAllProjects().size());
-    }
-
-    @Test
-    public void testCubeCRUD() throws Exception {
-        final Broadcaster broadcaster = Broadcaster.getInstance(configA);
-        broadcaster.getCounterAndClear();
-
-        getStore().deleteResource("/cube/a_whole_new_cube.json");
-
-        //create cube
-
-        final String cubeName = "a_whole_new_cube";
-        final CubeManager cubeManager = getCubeManager(configA);
-        final CubeManager cubeManagerB = getCubeManager(configB);
-        final ProjectManager projectManager = getProjectManager(configA);
-        final ProjectManager projectManagerB = getProjectManager(configB);
-        final CubeDescManager cubeDescManager = getCubeDescManager(configA);
-        final CubeDescManager cubeDescManagerB = getCubeDescManager(configB);
-        final CubeDesc cubeDesc = getCubeDescManager(configA).getCubeDesc("test_kylin_cube_with_slr_desc");
-
-        assertTrue(cubeManager.getCube(cubeName) == null);
-        assertTrue(cubeManagerB.getCube(cubeName) == null);
-        assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-        assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-        cubeManager.createCube(cubeName, ProjectInstance.DEFAULT_PROJECT_NAME, cubeDesc, null);
-        //one for cube update, one for project update
-        assertEquals(2, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(2);
-
-        assertNotNull(cubeManager.getCube(cubeName));
-        assertNotNull(cubeManagerB.getCube(cubeName));
-        assertTrue(containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-        assertTrue(containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-
-        //update cube
-        CubeInstance cube = cubeManager.getCube(cubeName);
-        assertEquals(0, cube.getSegments().size());
-        assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size());
-        CubeSegment segment = new CubeSegment();
-        segment.setName("test_segment");
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToAddSegs(segment);
-        cube = cubeManager.updateCube(cubeBuilder);
-        //one for cube update
-        assertEquals(1, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(1);
-        assertEquals(1, cubeManagerB.getCube(cubeName).getSegments().size());
-        assertEquals(segment.getName(), cubeManagerB.getCube(cubeName).getSegments().get(0).getName());
-
-        //delete cube
-        cubeManager.dropCube(cubeName, false);
-        //one for cube update, one for project update
-        assertEquals(2, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(2);
-
-        assertTrue(cubeManager.getCube(cubeName) == null);
-        assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-        assertTrue(cubeManagerB.getCube(cubeName) == null);
-        assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-
-        final String cubeDescName = "test_cube_desc";
-        cubeDesc.setName(cubeDescName);
-        cubeDesc.setLastModified(0);
-        assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null);
-        assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null);
-        cubeDescManager.createCubeDesc(cubeDesc);
-        //one for add cube desc
-        assertEquals(1, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(1);
-        assertNotNull(cubeDescManager.getCubeDesc(cubeDescName));
-        assertNotNull(cubeDescManagerB.getCubeDesc(cubeDescName));
-
-        cubeDesc.setNotifyList(Arrays.asList("test@email", "test@email", "test@email"));
-        cubeDescManager.updateCubeDesc(cubeDesc);
-        assertEquals(1, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(1);
-        assertEquals(cubeDesc.getNotifyList(), cubeDescManagerB.getCubeDesc(cubeDescName).getNotifyList());
-
-        cubeDescManager.removeCubeDesc(cubeDesc);
-        //one for add cube desc
-        assertEquals(1, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(1);
-        assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null);
-        assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null);
-
-        getStore().deleteResource("/cube/a_whole_new_cube.json");
-    }
-
-    private TableDesc createTestTableDesc() {
-        TableDesc tableDesc = new TableDesc();
-        tableDesc.setDatabase("TEST_DB");
-        tableDesc.setName("TEST_TABLE");
-        tableDesc.setUuid(UUID.randomUUID().toString());
-        tableDesc.setLastModified(0);
-        return tableDesc;
-    }
-
-    @Test
-    public void testMetaCRUD() throws Exception {
-        final MetadataManager metadataManager = MetadataManager.getInstance(configA);
-        final MetadataManager metadataManagerB = MetadataManager.getInstance(configB);
-        final Broadcaster broadcaster = Broadcaster.getInstance(configA);
-        broadcaster.getCounterAndClear();
-
-        TableDesc tableDesc = createTestTableDesc();
-        assertTrue(metadataManager.getTableDesc(tableDesc.getIdentity()) == null);
-        assertTrue(metadataManagerB.getTableDesc(tableDesc.getIdentity()) == null);
-        metadataManager.saveSourceTable(tableDesc);
-        //only one for table insert
-        assertEquals(1, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(1);
-        assertNotNull(metadataManager.getTableDesc(tableDesc.getIdentity()));
-        assertNotNull(metadataManagerB.getTableDesc(tableDesc.getIdentity()));
-
-        final String dataModelName = "test_data_model";
-        DataModelDesc dataModelDesc = metadataManager.getDataModelDesc("test_kylin_left_join_model_desc");
-        dataModelDesc.setName(dataModelName);
-        dataModelDesc.setLastModified(0);
-        assertTrue(metadataManager.getDataModelDesc(dataModelName) == null);
-        assertTrue(metadataManagerB.getDataModelDesc(dataModelName) == null);
-
-        dataModelDesc.setName(dataModelName);
-        metadataManager.createDataModelDesc(dataModelDesc, "default", "ADMIN");
-        //one for data model creation, one for project meta update
-        assertEquals(2, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(2);
-        assertEquals(dataModelDesc.getName(), metadataManagerB.getDataModelDesc(dataModelName).getName());
-
-        final LookupDesc[] lookups = dataModelDesc.getLookups();
-        assertTrue(lookups.length > 0);
-        dataModelDesc.setLookups(lookups);
-        metadataManager.updateDataModelDesc(dataModelDesc);
-        //only one for data model update
-        assertEquals(1, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(1);
-        assertEquals(dataModelDesc.getLookups().length, metadataManagerB.getDataModelDesc(dataModelName).getLookups().length);
-
-    }
-
-    private boolean containsRealization(Set<IRealization> realizations, RealizationType type, String name) {
-        for (IRealization realization : realizations) {
-            if (realization.getType() == type && realization.getName().equals(name)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// * 
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// * 
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+//*/
+//
+//package org.apache.kylin.rest.service;
+//
+//import org.apache.kylin.common.KylinConfig;
+//import org.apache.kylin.common.restclient.Broadcaster;
+//import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+//import org.apache.kylin.cube.*;
+//import org.apache.kylin.cube.model.CubeDesc;
+//import org.apache.kylin.metadata.MetadataManager;
+//import org.apache.kylin.metadata.model.DataModelDesc;
+//import org.apache.kylin.metadata.model.LookupDesc;
+//import org.apache.kylin.metadata.model.TableDesc;
+//import org.apache.kylin.metadata.project.ProjectInstance;
+//import org.apache.kylin.metadata.project.ProjectManager;
+//import org.apache.kylin.metadata.realization.IRealization;
+//import org.apache.kylin.metadata.realization.RealizationType;
+//import org.apache.kylin.rest.broadcaster.BroadcasterReceiveServlet;
+//import org.eclipse.jetty.server.Server;
+//import org.eclipse.jetty.servlet.ServletContextHandler;
+//import org.eclipse.jetty.servlet.ServletHolder;
+//import org.junit.*;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.util.Arrays;
+//import java.util.Set;
+//import java.util.UUID;
+//import java.util.concurrent.atomic.AtomicLong;
+//
+//import static org.junit.Assert.*;
+//
+///**
+// */
+//public class CacheServiceTest extends LocalFileMetadataTestCase {
+//
+//    private static Server server;
+//
+//    private static String ZK_ADDRESS = "localhost:2199";
+//
+//    private static KylinConfig configA;
+//    private static KylinConfig configB;
+//
+//    private static final Logger logger = LoggerFactory.getLogger(CacheServiceTest.class);
+//
+//    private static AtomicLong counter = new AtomicLong();
+//
+//    @BeforeClass
+//    public static void beforeClass() throws Exception {
+//        staticCreateTestMetadata();
+//        configA = KylinConfig.getInstanceFromEnv();
+//        configA.setProperty("kylin.rest.servers", "localhost:7070");
+//        configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
+//        configB.setProperty("kylin.rest.servers", "localhost:7070");
+//        configB.setMetadataUrl("../examples/test_metadata");
+//
+//        server = new Server(7070);
+//        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+//        context.setContextPath("/");
+//        server.setHandler(context);
+//
+//        final CacheService serviceA = new CacheService() {
+//            @Override
+//            public KylinConfig getConfig() {
+//                return configA;
+//            }
+//        };
+//        final CacheService serviceB = new CacheService() {
+//            @Override
+//            public KylinConfig getConfig() {
+//                return configB;
+//            }
+//        };
+//
+//        final CubeService cubeServiceA = new CubeService() {
+//            @Override
+//            public KylinConfig getConfig() {
+//                return configA;
+//            }
+//        };
+//        final CubeService cubeServiceB = new CubeService() {
+//            @Override
+//            public KylinConfig getConfig() {
+//                return configB;
+//            }
+//        };
+//
+//        serviceA.setCubeService(cubeServiceA);
+//        serviceA.initCubeChangeListener();
+//        serviceB.setCubeService(cubeServiceB);
+//        serviceB.initCubeChangeListener();
+//
+//        context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
+//            @Override
+//            public void handle(String type, String name, String event) {
+//
+//                Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type);
+//                Broadcaster.EVENT wipeEvent = Broadcaster.EVENT.getEvent(event);
+//                final String log = "wipe cache type: " + wipeType + " event:" + wipeEvent + " name:" + name;
+//                logger.info(log);
+//                try {
+//                    switch (wipeEvent) {
+//                    case CREATE:
+//                    case UPDATE:
+//                        serviceA.rebuildCache(wipeType, name);
+//                        serviceB.rebuildCache(wipeType, name);
+//                        break;
+//                    case DROP:
+//                        serviceA.removeCache(wipeType, name);
+//                        serviceB.removeCache(wipeType, name);
+//                        break;
+//                    default:
+//                        throw new RuntimeException("invalid type:" + wipeEvent);
+//                    }
+//                } finally {
+//                    counter.incrementAndGet();
+//                }
+//            }
+//        })), "/");
+//
+//        server.start();
+//    }
+//
+//    @AfterClass
+//    public static void afterClass() throws Exception {
+//        server.stop();
+//        cleanAfterClass();
+//    }
+//
+//    @Before
+//    public void setUp() throws Exception {
+//        counter.set(0L);
+//        createTestMetadata();
+//    }
+//
+//    @After
+//    public void after() throws Exception {
+//        cleanupTestMetadata();
+//    }
+//
+//    private void waitForCounterAndClear(long count) {
+//        int retryTimes = 0;
+//        while ((!counter.compareAndSet(count, 0L))) {
+//            if (++retryTimes > 30) {
+//                throw new RuntimeException("timeout");
+//            }
+//            try {
+//                Thread.sleep(100L);
+//            } catch (InterruptedException e) {
+//                e.printStackTrace();
+//            }
+//        }
+//    }
+//
+//    private static CubeManager getCubeManager(KylinConfig config) throws Exception {
+//        return CubeManager.getInstance(config);
+//    }
+//
+//    private static ProjectManager getProjectManager(KylinConfig config) throws Exception {
+//        return ProjectManager.getInstance(config);
+//    }
+//
+//    private static CubeDescManager getCubeDescManager(KylinConfig config) throws Exception {
+//        return CubeDescManager.getInstance(config);
+//    }
+//
+//    private static MetadataManager getMetadataManager(KylinConfig config) throws Exception {
+//        return MetadataManager.getInstance(config);
+//    }
+//
+//    @Test
+//    public void testBasic() throws Exception {
+//        assertTrue(!configA.equals(configB));
+//
+//        assertNotNull(getCubeManager(configA));
+//        assertNotNull(getCubeManager(configB));
+//        assertNotNull(getCubeDescManager(configA));
+//        assertNotNull(getCubeDescManager(configB));
+//        assertNotNull(getProjectManager(configB));
+//        assertNotNull(getProjectManager(configB));
+//        assertNotNull(getMetadataManager(configB));
+//        assertNotNull(getMetadataManager(configB));
+//
+//        assertTrue(!getCubeManager(configA).equals(getCubeManager(configB)));
+//        assertTrue(!getCubeDescManager(configA).equals(getCubeDescManager(configB)));
+//        assertTrue(!getProjectManager(configA).equals(getProjectManager(configB)));
+//        assertTrue(!getMetadataManager(configA).equals(getMetadataManager(configB)));
+//
+//        assertEquals(getProjectManager(configA).listAllProjects().size(), getProjectManager(configB).listAllProjects().size());
+//    }
+//
+//    @Test
+//    public void testCubeCRUD() throws Exception {
+//        final Broadcaster broadcaster = Broadcaster.getInstance(configA);
+//        broadcaster.getCounterAndClear();
+//
+//        getStore().deleteResource("/cube/a_whole_new_cube.json");
+//
+//        //create cube
+//
+//        final String cubeName = "a_whole_new_cube";
+//        final CubeManager cubeManager = getCubeManager(configA);
+//        final CubeManager cubeManagerB = getCubeManager(configB);
+//        final ProjectManager projectManager = getProjectManager(configA);
+//        final ProjectManager projectManagerB = getProjectManager(configB);
+//        final CubeDescManager cubeDescManager = getCubeDescManager(configA);
+//        final CubeDescManager cubeDescManagerB = getCubeDescManager(configB);
+//        final CubeDesc cubeDesc = getCubeDescManager(configA).getCubeDesc("test_kylin_cube_with_slr_desc");
+//
+//        assertTrue(cubeManager.getCube(cubeName) == null);
+//        assertTrue(cubeManagerB.getCube(cubeName) == null);
+//        assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//        assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//        cubeManager.createCube(cubeName, ProjectInstance.DEFAULT_PROJECT_NAME, cubeDesc, null);
+//        //one for cube update, one for project update
+//        assertEquals(2, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(2);
+//
+//        assertNotNull(cubeManager.getCube(cubeName));
+//        assertNotNull(cubeManagerB.getCube(cubeName));
+//        assertTrue(containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//        assertTrue(containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//
+//        //update cube
+//        CubeInstance cube = cubeManager.getCube(cubeName);
+//        assertEquals(0, cube.getSegments().size());
+//        assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size());
+//        CubeSegment segment = new CubeSegment();
+//        segment.setName("test_segment");
+//        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+//        cubeBuilder.setToAddSegs(segment);
+//        cube = cubeManager.updateCube(cubeBuilder);
+//        //one for cube update
+//        assertEquals(1, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(1);
+//        assertEquals(1, cubeManagerB.getCube(cubeName).getSegments().size());
+//        assertEquals(segment.getName(), cubeManagerB.getCube(cubeName).getSegments().get(0).getName());
+//
+//        //delete cube
+//        cubeManager.dropCube(cubeName, false);
+//        //one for cube update, one for project update
+//        assertEquals(2, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(2);
+//
+//        assertTrue(cubeManager.getCube(cubeName) == null);
+//        assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//        assertTrue(cubeManagerB.getCube(cubeName) == null);
+//        assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//
+//        final String cubeDescName = "test_cube_desc";
+//        cubeDesc.setName(cubeDescName);
+//        cubeDesc.setLastModified(0);
+//        assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null);
+//        assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null);
+//        cubeDescManager.createCubeDesc(cubeDesc);
+//        //one for add cube desc
+//        assertEquals(1, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(1);
+//        assertNotNull(cubeDescManager.getCubeDesc(cubeDescName));
+//        assertNotNull(cubeDescManagerB.getCubeDesc(cubeDescName));
+//
+//        cubeDesc.setNotifyList(Arrays.asList("test@email", "test@email", "test@email"));
+//        cubeDescManager.updateCubeDesc(cubeDesc);
+//        assertEquals(1, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(1);
+//        assertEquals(cubeDesc.getNotifyList(), cubeDescManagerB.getCubeDesc(cubeDescName).getNotifyList());
+//
+//        cubeDescManager.removeCubeDesc(cubeDesc);
+//        //one for add cube desc
+//        assertEquals(1, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(1);
+//        assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null);
+//        assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null);
+//
+//        getStore().deleteResource("/cube/a_whole_new_cube.json");
+//    }
+//
+//    private TableDesc createTestTableDesc() {
+//        TableDesc tableDesc = new TableDesc();
+//        tableDesc.setDatabase("TEST_DB");
+//        tableDesc.setName("TEST_TABLE");
+//        tableDesc.setUuid(UUID.randomUUID().toString());
+//        tableDesc.setLastModified(0);
+//        return tableDesc;
+//    }
+//
+//    @Test
+//    public void testMetaCRUD() throws Exception {
+//        final MetadataManager metadataManager = MetadataManager.getInstance(configA);
+//        final MetadataManager metadataManagerB = MetadataManager.getInstance(configB);
+//        final Broadcaster broadcaster = Broadcaster.getInstance(configA);
+//        broadcaster.getCounterAndClear();
+//
+//        TableDesc tableDesc = createTestTableDesc();
+//        assertTrue(metadataManager.getTableDesc(tableDesc.getIdentity()) == null);
+//        assertTrue(metadataManagerB.getTableDesc(tableDesc.getIdentity()) == null);
+//        metadataManager.saveSourceTable(tableDesc);
+//        //only one for table insert
+//        assertEquals(1, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(1);
+//        assertNotNull(metadataManager.getTableDesc(tableDesc.getIdentity()));
+//        assertNotNull(metadataManagerB.getTableDesc(tableDesc.getIdentity()));
+//
+//        final String dataModelName = "test_data_model";
+//        DataModelDesc dataModelDesc = metadataManager.getDataModelDesc("test_kylin_left_join_model_desc");
+//        dataModelDesc.setName(dataModelName);
+//        dataModelDesc.setLastModified(0);
+//        assertTrue(metadataManager.getDataModelDesc(dataModelName) == null);
+//        assertTrue(metadataManagerB.getDataModelDesc(dataModelName) == null);
+//
+//        dataModelDesc.setName(dataModelName);
+//        metadataManager.createDataModelDesc(dataModelDesc, "default", "ADMIN");
+//        //one for data model creation, one for project meta update
+//        assertEquals(2, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(2);
+//        assertEquals(dataModelDesc.getName(), metadataManagerB.getDataModelDesc(dataModelName).getName());
+//
+//        final LookupDesc[] lookups = dataModelDesc.getLookups();
+//        assertTrue(lookups.length > 0);
+//        dataModelDesc.setLookups(lookups);
+//        metadataManager.updateDataModelDesc(dataModelDesc);
+//        //only one for data model update
+//        assertEquals(1, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(1);
+//        assertEquals(dataModelDesc.getLookups().length, metadataManagerB.getDataModelDesc(dataModelName).getLookups().length);
+//
+//    }
+//
+//    private boolean containsRealization(Set<IRealization> realizations, RealizationType type, String name) {
+//        for (IRealization realization : realizations) {
+//            if (realization.getType() == type && realization.getName().equals(name)) {
+//                return true;
+//            }
+//        }
+//        return false;
+//    }
+//
+//}

http://git-wip-us.apache.org/repos/asf/kylin/blob/67e92021/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
index f8dc945..ca4fe39 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
@@ -18,6 +18,12 @@
 
 package org.apache.kylin.rest.service;
 
+import com.google.common.collect.Lists;
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeManager;
@@ -26,42 +32,42 @@ import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.RealizationRegistry;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
+import org.junit.*;
 import org.junit.runner.RunWith;
 import org.springframework.security.authentication.TestingAuthenticationToken;
 import org.springframework.security.core.Authentication;
 import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.security.core.userdetails.User;
+import org.springframework.security.core.userdetails.UserDetails;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * @author xduo
  */
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:kylinSecurity.xml" })
 @ActiveProfiles("testing")
-public class ServiceTestBase extends LocalFileMetadataTestCase {
-
-    @BeforeClass
-    public static void setupResource() throws Exception {
-        staticCreateTestMetadata();
-        Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", "ROLE_ADMIN");
-        SecurityContextHolder.getContext().setAuthentication(authentication);
-    }
-
-    @AfterClass
-    public static void tearDownResource() {
-    }
+public class ServiceTestBase extends TestBaseWithZookeeper {
 
     @Before
     public void setup() throws Exception {
         this.createTestMetadata();
 
+        UserService.UserGrantedAuthority userGrantedAuthority = new UserService.UserGrantedAuthority();
+        userGrantedAuthority.setAuthority("ROLE_ADMIN");
+        UserDetails user = new User("ADMIN", "skippped-ldap", Lists.newArrayList(userGrantedAuthority));
+        Authentication authentication = new TestingAuthenticationToken(user, "ADMIN", "ROLE_ADMIN");
+        SecurityContextHolder.getContext().setAuthentication(authentication);
+        KylinConfig kylinConfig = this.getTestConfig();
+        kylinConfig.setRestAddress("localhost:7070");
+
         MetadataManager.clearCache();
         CubeDescManager.clearCache();
         CubeManager.clearCache();


[5/5] kylin git commit: KYLIN-1387 Streaming cubing doesn't generate cuboids files on HDFS, cause cube merge failure

Posted by sh...@apache.org.
KYLIN-1387 Streaming cubing doesn't generate cuboids files on HDFS, cause cube merge failure

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5410e62d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5410e62d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5410e62d

Branch: refs/heads/helix-201601
Commit: 5410e62dd50e7684f082cfb95ffb2e3c66647c0a
Parents: 710860d
Author: shaofengshi <sh...@apache.org>
Authored: Tue Feb 2 17:34:46 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Feb 2 17:35:40 2016 +0800

----------------------------------------------------------------------
 .../cube/inmemcubing/CompoundCuboidWriter.java  | 57 ++++++++++++++
 .../kylin/cube/inmemcubing/ICuboidWriter.java   |  4 +-
 .../kylin/job/constant/ExecutableConstants.java |  1 +
 .../kylin/engine/mr/steps/KVGTRecordWriter.java | 81 ++++++++++++++++++++
 .../mr/steps/MapContextGTRecordWriter.java      | 69 ++---------------
 .../streaming/cube/StreamingCubeBuilder.java    | 12 ++-
 .../storage/hbase/steps/HBaseCuboidWriter.java  | 24 +++---
 .../hbase/steps/HBaseMROutput2Transition.java   |  2 +-
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  2 +-
 .../hbase/steps/HBaseStreamingOutput.java       |  8 +-
 .../hbase/steps/SequenceFileCuboidWriter.java   | 75 ++++++++++++++++++
 11 files changed, 254 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5410e62d/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
new file mode 100644
index 0000000..46eef50
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
@@ -0,0 +1,57 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
+
+/**
+ */
+public class CompoundCuboidWriter implements ICuboidWriter {
+
+    private Iterable<ICuboidWriter> cuboidWriters;
+
+    public CompoundCuboidWriter(Iterable<ICuboidWriter> cuboidWriters) {
+        this.cuboidWriters = cuboidWriters;
+
+    }
+
+    @Override
+    public void write(long cuboidId, GTRecord record) throws IOException {
+        for (ICuboidWriter writer : cuboidWriters) {
+            writer.write(cuboidId, record);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        for (ICuboidWriter writer : cuboidWriters) {
+            writer.flush();
+        }
+
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (ICuboidWriter writer : cuboidWriters) {
+            writer.close();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5410e62d/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
index 9e26e5e..e6cfa02 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
@@ -27,7 +27,7 @@ public interface ICuboidWriter {
 
     void write(long cuboidId, GTRecord record) throws IOException;
 
-    void flush();
+    void flush() throws IOException;
     
-    void close();
+    void close() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5410e62d/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index ba50880..d370b0d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -56,6 +56,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
     public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
     public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
+    public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS";
 
     public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
     public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";

http://git-wip-us.apache.org/repos/asf/kylin/blob/5410e62d/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
new file mode 100644
index 0000000..e201705
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
@@ -0,0 +1,81 @@
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ */
+public abstract class KVGTRecordWriter implements ICuboidWriter {
+
+    private static final Log logger = LogFactory.getLog(KVGTRecordWriter.class);
+    private Long lastCuboidId;
+    protected CubeSegment cubeSegment;
+    protected CubeDesc cubeDesc;
+
+    private AbstractRowKeyEncoder rowKeyEncoder;
+    private int dimensions;
+    private int measureCount;
+    private byte[] keyBuf;
+    private int[] measureColumnsIndex;
+    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    private ByteArrayWritable outputKey = new ByteArrayWritable();
+    private ByteArrayWritable outputValue = new ByteArrayWritable();
+    private long cuboidRowCount = 0;
+
+    //for shard
+
+    public KVGTRecordWriter(CubeDesc cubeDesc, CubeSegment cubeSegment) {
+        this.cubeDesc = cubeDesc;
+        this.cubeSegment = cubeSegment;
+        this.measureCount = cubeDesc.getMeasures().size();
+    }
+
+    @Override
+    public void write(long cuboidId, GTRecord record) throws IOException {
+
+        if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
+            if (lastCuboidId != null) {
+                logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
+                cuboidRowCount = 0;
+            }
+            // output another cuboid
+            initVariables(cuboidId);
+            lastCuboidId = cuboidId;
+        }
+
+        cuboidRowCount++;
+        rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf);
+
+        //output measures
+        valueBuf.clear();
+        record.exportColumns(measureColumnsIndex, valueBuf);
+
+        outputKey.set(keyBuf, 0, keyBuf.length);
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+        writeAsKeyValue(outputKey, outputValue);
+    }
+
+    protected abstract void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException;
+
+    private void initVariables(Long cuboidId) {
+        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId));
+        keyBuf = rowKeyEncoder.createBuf();
+
+        dimensions = Long.bitCount(cuboidId);
+        measureColumnsIndex = new int[measureCount];
+        for (int i = 0; i < measureCount; i++) {
+            measureColumnsIndex[i] = dimensions + i;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5410e62d/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 8416d95..bee152b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -1,76 +1,32 @@
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.MapContext;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
 
 /**
  */
-public class MapContextGTRecordWriter implements ICuboidWriter {
+public class MapContextGTRecordWriter extends KVGTRecordWriter {
 
     private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
     protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
-    private Long lastCuboidId;
-    protected CubeSegment cubeSegment;
-    protected CubeDesc cubeDesc;
-
-    private AbstractRowKeyEncoder rowKeyEncoder;
-    private int dimensions;
-    private int measureCount;
-    private byte[] keyBuf;
-    private int[] measureColumnsIndex;
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private ByteArrayWritable outputKey = new ByteArrayWritable();
-    private ByteArrayWritable outputValue = new ByteArrayWritable();
-    private long cuboidRowCount = 0;
-
-    //for shard
 
     public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
+        super(cubeDesc, cubeSegment);
         this.mapContext = mapContext;
-        this.cubeDesc = cubeDesc;
-        this.cubeSegment = cubeSegment;
-        this.measureCount = cubeDesc.getMeasures().size();
     }
 
     @Override
-    public void write(long cuboidId, GTRecord record) throws IOException {
-
-        if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
-            if (lastCuboidId != null) {
-                logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
-                cuboidRowCount = 0;
-            }
-            // output another cuboid
-            initVariables(cuboidId);
-            lastCuboidId = cuboidId;
-        }
-
-        cuboidRowCount++;
-        rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf);
-
-        //output measures
-        valueBuf.clear();
-        record.exportColumns(measureColumnsIndex, valueBuf);
-
-        outputKey.set(keyBuf, 0, keyBuf.length);
-        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+    protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException {
         try {
-            mapContext.write(outputKey, outputValue);
+            mapContext.write(key, value);
         } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+            throw new IOException(e);
         }
     }
 
@@ -83,15 +39,4 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
     public void close() {
 
     }
-
-    private void initVariables(Long cuboidId) {
-        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId));
-        keyBuf = rowKeyEncoder.createBuf();
-
-        dimensions = Long.bitCount(cuboidId);
-        measureColumnsIndex = new int[measureCount];
-        for (int i = 0; i < measureCount; i++) {
-            measureColumnsIndex[i] = dimensions + i;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5410e62d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index 044dcca..bf08c97 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -98,6 +98,14 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
             throw new RuntimeException(e);
         } catch (ExecutionException e) {
             throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+        } catch (IOException e) {
+            throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+        } finally {
+            try {
+                cuboidWriter.close();
+            } catch (IOException e) {
+                throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+            }
         }
     }
 
@@ -106,7 +114,9 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
         CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
         try {
-            return cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false);
+            CubeSegment segment = cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false);
+            segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
+            return segment;
         } catch (IOException e) {
             throw new RuntimeException("failed to create IBuildable", e);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5410e62d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index c4dc0b5..ddc868d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -33,9 +33,8 @@
  */
 package org.apache.kylin.storage.hbase.steps;
 
-import java.io.IOException;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
@@ -51,13 +50,14 @@ import org.apache.kylin.gridtable.GTRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
 
 /**
  */
-public final class HBaseCuboidWriter implements ICuboidWriter {
+public class HBaseCuboidWriter implements ICuboidWriter {
 
-    private static final Logger logger = LoggerFactory.getLogger(HBaseStreamingOutput.class);
+    private static final Logger logger = LoggerFactory.getLogger(HBaseCuboidWriter.class);
 
     private static final int BATCH_PUT_THRESHOLD = 10000;
 
@@ -125,8 +125,8 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
         }
     }
 
-    public final void flush() {
-        try {
+    @Override
+    public final void flush() throws IOException {
             if (!puts.isEmpty()) {
                 long t = System.currentTimeMillis();
                 if (hTable != null) {
@@ -136,14 +136,12 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
                 logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
                 puts.clear();
             }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
     }
 
     @Override
-    public void close() {
-
+    public void close() throws IOException {
+        flush();
+        IOUtils.closeQuietly(hTable);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5410e62d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 4c2737d..7bb3647 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -80,7 +80,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
 
             @Override
             public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createMergeGCStep());
+                steps.addMergingGarbageCollectionSteps(jobFlow);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5410e62d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index c3bd7b5..535f877 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -161,7 +161,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
         toDeletePaths.addAll(getMergingHDFSPaths());
 
         HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
-        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
         step.setDeletePaths(toDeletePaths);
         step.setJobId(jobId);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/5410e62d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index fdab8eb..19873c9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -18,9 +18,11 @@
 package org.apache.kylin.storage.hbase.steps;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -31,6 +33,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.inmemcubing.CompoundCuboidWriter;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -54,7 +57,10 @@ public class HBaseStreamingOutput implements IStreamingOutput {
 
             final HTableInterface hTable;
             hTable = createHTable(cubeSegment);
-            return new HBaseCuboidWriter(cubeSegment, hTable);
+            List<ICuboidWriter> cuboidWriters = Lists.newArrayList();
+            cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable));
+            cuboidWriters.add(new SequenceFileCuboidWriter(cubeSegment.getCubeDesc(), cubeSegment));
+            return new CompoundCuboidWriter(cuboidWriters);
         } catch (IOException e) {
             throw new RuntimeException("failed to get ICuboidWriter", e);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5410e62d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
new file mode 100644
index 0000000..4d76522
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
@@ -0,0 +1,75 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.steps.KVGTRecordWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ */
+public class SequenceFileCuboidWriter extends KVGTRecordWriter {
+
+    private static final Logger logger = LoggerFactory.getLogger(SequenceFileCuboidWriter.class);
+    private SequenceFile.Writer writer = null;
+
+    public SequenceFileCuboidWriter(CubeDesc cubeDesc, CubeSegment segment) {
+        super(cubeDesc, segment);
+    }
+
+
+    @Override
+    protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException {
+        if (writer == null) {
+            synchronized (SequenceFileCuboidWriter.class) {
+                if (writer == null) {
+                    JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "SYSTEM");
+                    String cuboidRoot = jobBuilderSupport.getCuboidRootPath(cubeSegment);
+                    Path cuboidPath = new Path(cuboidRoot);
+                    FileSystem fs = HadoopUtil.getFileSystem(cuboidRoot);
+                    try {
+                        if (fs.exists(cuboidPath)) {
+                            fs.delete(cuboidPath, true);
+                        }
+
+                        fs.mkdirs(cuboidPath);
+                    } finally {
+                        IOUtils.closeQuietly(fs);
+                    }
+
+                    Path cuboidFile = new Path(cuboidPath, "data.seq");
+                    logger.debug("Cuboid is written to " + cuboidFile);
+                    writer = SequenceFile.createWriter(HadoopUtil.getCurrentConfiguration(), SequenceFile.Writer.file(cuboidFile), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
+                }
+            }
+        }
+
+        Text outputValue = new Text();
+        Text outputKey = new Text();
+        outputKey.set(key.array(), key.offset(), key.length());
+        outputValue.set(value.array(), value.offset(), value.length());
+        writer.append(outputKey, outputValue);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (writer != null) {
+            writer.hflush();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOUtils.closeQuietly(writer);
+    }
+}


[4/5] kylin git commit: KYLIN-1311 write rest servers to file

Posted by sh...@apache.org.
KYLIN-1311 write rest servers to file

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/710860d2
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/710860d2
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/710860d2

Branch: refs/heads/helix-201601
Commit: 710860d2d41c85a3548acabe92e25ff4e5ba06d3
Parents: 67e9202
Author: shaofengshi <sh...@apache.org>
Authored: Sun Jan 24 21:41:56 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Jan 24 21:41:56 2016 +0800

----------------------------------------------------------------------
 build/bin/streaming_build.sh                    |  9 ++++--
 build/bin/streaming_fillgap.sh                  |  8 +++--
 .../org/apache/kylin/common/KylinConfig.java    | 31 ++++++++++++++++++++
 .../kylin/rest/helix/HelixClusterAdmin.java     | 27 ++++++++++-------
 4 files changed, 58 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/710860d2/build/bin/streaming_build.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh
index cb86e29..ffb6101 100644
--- a/build/bin/streaming_build.sh
+++ b/build/bin/streaming_build.sh
@@ -20,15 +20,18 @@
 source /etc/profile
 source ~/.bash_profile
 
-STREAMING=$1
+CUBE_NAME=$1
 INTERVAL=$2
 DELAY=$3
 MARGIN=$4
+AUTHORIZATION=$5
+KYLIN_HOST=$6
 CURRENT_TIME_IN_SECOND=`date +%s`
 CURRENT_TIME=$((CURRENT_TIME_IN_SECOND * 1000))
 START=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY))
 END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL))
 
 ID="$START"_"$END"
-echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${STREAMING} ${ID} -oneoff true -start ${START} -end ${END} -streaming ${STREAMING} -margin ${MARGIN}
\ No newline at end of file
+echo "building for ${CUBE_NAME} ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log
+#sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${CUBE_NAME} ${ID} -oneoff true -start ${START} -end ${END} -streaming ${CUBE_NAME} -margin ${MARGIN}
+curl --request PUT --data "{\"start\": $START, \"end\": $END }" --header "Authorization: Basic $AUTHORIZATION" --header "Content-Type: application/json" -v ${KYLIN_HOST}/kylin/api/streaming/${CUBE_NAME}/build

http://git-wip-us.apache.org/repos/asf/kylin/blob/710860d2/build/bin/streaming_fillgap.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh
index 74d9037..31c4886 100644
--- a/build/bin/streaming_fillgap.sh
+++ b/build/bin/streaming_fillgap.sh
@@ -20,8 +20,10 @@
 source /etc/profile
 source ~/.bash_profile
 
-streaming=$1
-margin=$2
+CUBE_NAME=$1
+AUTHORIZATION=$2
+KYLIN_HOST=$3
 
 cd ${KYLIN_HOME}
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${streaming} fillgap -streaming ${streaming} -fillGap true -margin ${margin}
\ No newline at end of file
+#sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${streaming} fillgap -streaming ${streaming} -fillGap true -margin ${margin}
+curl --request PUT --header "Authorization: Basic $AUTHORIZATION" --header "Content-Type: application/json" -v ${KYLIN_HOST}/kylin/api/streaming/${CUBE_NAME}/fillgap

http://git-wip-us.apache.org/repos/asf/kylin/blob/710860d2/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 81f5827..08fb6dd 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -265,4 +265,35 @@ public class KylinConfig extends KylinConfigBase {
         }
     }
 
+    public static void writeOverrideProperties(Properties properties) throws IOException {
+        File propFile = getKylinProperties();
+        File overrideFile = new File(propFile.getParentFile(), propFile.getName() + ".override");
+        overrideFile.createNewFile();
+        FileInputStream fis2 = null;
+        Properties override = new Properties();
+        try {
+            fis2 = new FileInputStream(overrideFile);
+            override.load(fis2);
+            for (Map.Entry<Object, Object> entries : properties.entrySet()) {
+                override.setProperty(entries.getKey().toString(), entries.getValue().toString());
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            IOUtils.closeQuietly(fis2);
+        }
+
+        PrintWriter pw = null;
+        try {
+            pw = new PrintWriter(overrideFile);
+            for (Enumeration e = override.propertyNames(); e.hasMoreElements();) {
+                String key = (String) e.nextElement();
+                pw.println(key + "=" + override.getProperty(key));
+            }
+            pw.close();
+        } finally {
+            IOUtils.closeQuietly(pw);
+        }
+        
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/710860d2/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
index 0758ef1..4da9a86 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -40,8 +40,10 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
@@ -101,7 +103,7 @@ public class HelixClusterAdmin {
         addInstance(instanceName, instanceTags);
         startInstance(instanceName);
 
-        rebalanceWithTag(instanceTags);
+        rebalanceWithTag(RESOURCE_NAME_JOB_ENGINE, TAG_JOB_ENGINE);
 
         boolean startController = kylinConfig.isClusterController();
         if (startController) {
@@ -123,7 +125,7 @@ public class HelixClusterAdmin {
 
         // add job engine as a resource, 1 partition
         if (!admin.getResourcesInCluster(clusterName).contains(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE)) {
-            admin.addResource(clusterName, HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
+            admin.addResource(clusterName, HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.FULL_AUTO.name());
         }
 
     }
@@ -134,8 +136,8 @@ public class HelixClusterAdmin {
             logger.warn("Resource '" + resourceName + "' already exists in cluster, remove and re-add.");
             admin.dropResource(clusterName, resourceName);
         }
-        admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
-        admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER);
+        admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.FULL_AUTO.name());
+        rebalanceWithTag(resourceName, TAG_STREAM_BUILDER);
 
     }
 
@@ -161,13 +163,9 @@ public class HelixClusterAdmin {
      * Rebalance the resource with the tags
      * @param tags
      */
-    protected void rebalanceWithTag(List<String> tags) {
-        for (String tag : tags) {
-            if (tag.equals(TAG_JOB_ENGINE)) {
-                List<String> instances = admin.getInstancesInClusterWithTag(clusterName, TAG_JOB_ENGINE);
-                admin.rebalance(clusterName, RESOURCE_NAME_JOB_ENGINE, instances.size(), "", tag);
-            }
-        }
+    protected void rebalanceWithTag(String resourceName, String tag) {
+        List<String> instances = admin.getInstancesInClusterWithTag(clusterName, tag);
+        admin.rebalance(clusterName, resourceName, instances.size(), "", tag);
     }
 
     /**
@@ -277,6 +275,13 @@ public class HelixClusterAdmin {
                 kylinConfig.setProperty("kylin.rest.servers", restServersInCluster);
                 System.setProperty("kylin.rest.servers", restServersInCluster);
                 logger.info("kylin.rest.servers update to " + restServersInCluster);
+                Properties properties = new Properties();
+                properties.setProperty("kylin.rest.servers", restServersInCluster);
+                try {
+                    KylinConfig.writeOverrideProperties(properties);
+                } catch (IOException e) {
+                    logger.error(e.getMessage(), e);
+                }
                 Broadcaster.clearCache();
             }
         }