You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/03/15 04:02:56 UTC

kylin git commit: stablize BuildCubeWithStream

Repository: kylin
Updated Branches:
  refs/heads/yang22 50955963d -> 0c6441c71


stablize BuildCubeWithStream


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0c6441c7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0c6441c7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0c6441c7

Branch: refs/heads/yang22
Commit: 0c6441c710ddefaee6285d41f4eb20e06533c9f8
Parents: 5095596
Author: shaofengshi <sh...@apache.org>
Authored: Wed Mar 15 12:02:52 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Mar 15 12:02:52 2017 +0800

----------------------------------------------------------------------
 .../kylin/provision/BuildCubeWithStream.java    | 76 ++++++++++++++++----
 .../kylin/storage/hbase/util/ZookeeperUtil.java | 52 ++++++++++++++
 2 files changed, 115 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0c6441c7/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 8abb84c..030b7d6 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -36,6 +36,10 @@ import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
@@ -63,6 +67,7 @@ import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
 import org.apache.kylin.tool.StorageCleanupJob;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -84,8 +89,14 @@ public class BuildCubeWithStream {
 
     private KafkaConfig kafkaConfig;
     private MockKafka kafkaServer;
+    private ZkConnection zkConnection;
+    private final String kafkaZkPath = "/" + UUID.randomUUID().toString();
+
     protected static boolean fastBuildMode = false;
-    private boolean generateData = true;
+    private volatile boolean generateData = true;
+    private volatile boolean generateDataDone = false;
+
+    private static final int BUILD_ROUND = 5;
 
     public void before() throws Exception {
         deployEnv();
@@ -126,8 +137,9 @@ public class BuildCubeWithStream {
 
     private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
         //Start mock Kakfa
-        String zkConnectionStr = "sandbox:2181";
-        ZkConnection zkConnection = new ZkConnection(zkConnectionStr);
+        String zkConnectionStr = ZookeeperUtil.getZKConnectString() + kafkaZkPath;
+        System.out.println("zkConnectionStr" + zkConnectionStr);
+        zkConnection = new ZkConnection(zkConnectionStr);
         // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
         kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId());
         kafkaServer.start();
@@ -171,18 +183,33 @@ public class BuildCubeWithStream {
                     try {
                         generateStreamData(dateStart, dateEnd, rand.nextInt(100));
                         dateStart = dateEnd;
-                        sleep(rand.nextInt(rand.nextInt(100 * 1000))); // wait random time
+                        sleep(rand.nextInt(rand.nextInt(30)) * 1000); // wait random time
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
                 }
+                generateDataDone = true;
             }
         }).start();
         ExecutorService executorService = Executors.newCachedThreadPool();
 
         List<FutureTask<ExecutableState>> futures = Lists.newArrayList();
-        for (int i = 0; i < 5; i++) {
-            Thread.sleep(2 * 60 * 1000); // wait for new messages
+        for (int i = 0; i < BUILD_ROUND; i++) {
+            if (i == (BUILD_ROUND - 1)) {
+                // stop generating message to kafka
+                generateData = false;
+                int waittime = 0;
+                while (generateDataDone == false && waittime < 100) {
+                    Thread.sleep(1000);
+                    waittime++;
+                }
+                if (generateDataDone == false) {
+                    throw new IllegalStateException("Timeout when wait all messages be sent to Kafka"); // ensure all messages have been flushed.
+                }
+            } else {
+                Thread.sleep(30 * 1000); // wait for new messages
+            }
+
             FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() {
                 @Override
                 public ExecutableState call() {
@@ -202,7 +229,7 @@ public class BuildCubeWithStream {
             futures.add(futureTask);
         }
 
-        generateData = false; // stop generating message to kafka
+        generateData = false;
         executorService.shutdown();
         int succeedBuild = 0;
         for (int i = 0; i < futures.size(); i++) {
@@ -265,8 +292,8 @@ public class BuildCubeWithStream {
 
     protected void deployEnv() throws IOException {
         DeployUtil.overrideJobJarLocations();
-        //        DeployUtil.initCliWorkDir();
-        //        DeployUtil.deployMetadata();
+        //                DeployUtil.initCliWorkDir();
+        //                DeployUtil.deployMetadata();
     }
 
     public static void beforeClass() throws Exception {
@@ -274,16 +301,31 @@ public class BuildCubeWithStream {
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
         System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
-            throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+            throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169");
         }
         HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
     }
 
     public void after() {
         kafkaServer.stop();
+        cleanKafkaZkPath(kafkaZkPath);
         DefaultScheduler.destroyInstance();
     }
 
+    private void cleanKafkaZkPath(String path) {
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        CuratorFramework zkClient = CuratorFrameworkFactory.newClient(ZookeeperUtil.getZKConnectString(), retryPolicy);
+        zkClient.start();
+
+        try {
+            zkClient.delete().deletingChildrenIfNeeded().forPath(kafkaZkPath);
+        } catch (Exception e) {
+            logger.warn("Failed to delete zookeeper path: " + path, e);
+        } finally {
+            zkClient.close();
+        }
+    }
+
     protected void waitForJob(String jobId) {
         while (true) {
             AbstractExecutable job = jobService.getJob(jobId);
@@ -311,6 +353,9 @@ public class BuildCubeWithStream {
     }
 
     public static void main(String[] args) throws Exception {
+        long start = System.currentTimeMillis();
+        int exitCode = 0;
+
         BuildCubeWithStream buildCubeWithStream = null;
         try {
             beforeClass();
@@ -318,13 +363,18 @@ public class BuildCubeWithStream {
             buildCubeWithStream.before();
             buildCubeWithStream.build();
             logger.info("Build is done");
+
+            buildCubeWithStream.after();
             buildCubeWithStream.cleanup();
             logger.info("Going to exit");
-            System.exit(0);
         } catch (Throwable e) {
             logger.error("error", e);
-            System.exit(1);
+            exitCode = 1;
         }
 
+        long millis = System.currentTimeMillis() - start;
+        System.out.println("Time elapsed: " + (millis / 1000) + " sec - in " + BuildCubeWithStream.class.getName());
+
+        System.exit(exitCode);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/0c6441c7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
new file mode 100644
index 0000000..b5ebe89
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.util.Arrays;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+public class ZookeeperUtil {
+
+    /**
+     * Get zookeeper connection string from HBase Configuration
+     *
+     * @return Zookeeper Connection string
+     */
+    public static String getZKConnectString() {
+        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+        final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
+        final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+        return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+            @Nullable
+            @Override
+            public String apply(String input) {
+                return input + ":" + port;
+            }
+        }), ",");
+    }
+}