You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/03/15 04:02:56 UTC
kylin git commit: stablize BuildCubeWithStream
Repository: kylin
Updated Branches:
refs/heads/yang22 50955963d -> 0c6441c71
stablize BuildCubeWithStream
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0c6441c7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0c6441c7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0c6441c7
Branch: refs/heads/yang22
Commit: 0c6441c710ddefaee6285d41f4eb20e06533c9f8
Parents: 5095596
Author: shaofengshi <sh...@apache.org>
Authored: Wed Mar 15 12:02:52 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Mar 15 12:02:52 2017 +0800
----------------------------------------------------------------------
.../kylin/provision/BuildCubeWithStream.java | 76 ++++++++++++++++----
.../kylin/storage/hbase/util/ZookeeperUtil.java | 52 ++++++++++++++
2 files changed, 115 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/0c6441c7/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 8abb84c..030b7d6 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -36,6 +36,10 @@ import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
@@ -63,6 +67,7 @@ import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.BrokerConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.apache.kylin.storage.hbase.util.ZookeeperUtil;
import org.apache.kylin.tool.StorageCleanupJob;
import org.junit.Assert;
import org.slf4j.Logger;
@@ -84,8 +89,14 @@ public class BuildCubeWithStream {
private KafkaConfig kafkaConfig;
private MockKafka kafkaServer;
+ private ZkConnection zkConnection;
+ private final String kafkaZkPath = "/" + UUID.randomUUID().toString();
+
protected static boolean fastBuildMode = false;
- private boolean generateData = true;
+ private volatile boolean generateData = true;
+ private volatile boolean generateDataDone = false;
+
+ private static final int BUILD_ROUND = 5;
public void before() throws Exception {
deployEnv();
@@ -126,8 +137,9 @@ public class BuildCubeWithStream {
private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) {
//Start mock Kakfa
- String zkConnectionStr = "sandbox:2181";
- ZkConnection zkConnection = new ZkConnection(zkConnectionStr);
+ String zkConnectionStr = ZookeeperUtil.getZKConnectString() + kafkaZkPath;
+ System.out.println("zkConnectionStr" + zkConnectionStr);
+ zkConnection = new ZkConnection(zkConnectionStr);
// Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState());
kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId());
kafkaServer.start();
@@ -171,18 +183,33 @@ public class BuildCubeWithStream {
try {
generateStreamData(dateStart, dateEnd, rand.nextInt(100));
dateStart = dateEnd;
- sleep(rand.nextInt(rand.nextInt(100 * 1000))); // wait random time
+ sleep(rand.nextInt(rand.nextInt(30)) * 1000); // wait random time
} catch (Exception e) {
e.printStackTrace();
}
}
+ generateDataDone = true;
}
}).start();
ExecutorService executorService = Executors.newCachedThreadPool();
List<FutureTask<ExecutableState>> futures = Lists.newArrayList();
- for (int i = 0; i < 5; i++) {
- Thread.sleep(2 * 60 * 1000); // wait for new messages
+ for (int i = 0; i < BUILD_ROUND; i++) {
+ if (i == (BUILD_ROUND - 1)) {
+ // stop generating message to kafka
+ generateData = false;
+ int waittime = 0;
+ while (generateDataDone == false && waittime < 100) {
+ Thread.sleep(1000);
+ waittime++;
+ }
+ if (generateDataDone == false) {
+ throw new IllegalStateException("Timeout when wait all messages be sent to Kafka"); // ensure all messages have been flushed.
+ }
+ } else {
+ Thread.sleep(30 * 1000); // wait for new messages
+ }
+
FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() {
@Override
public ExecutableState call() {
@@ -202,7 +229,7 @@ public class BuildCubeWithStream {
futures.add(futureTask);
}
- generateData = false; // stop generating message to kafka
+ generateData = false;
executorService.shutdown();
int succeedBuild = 0;
for (int i = 0; i < futures.size(); i++) {
@@ -265,8 +292,8 @@ public class BuildCubeWithStream {
protected void deployEnv() throws IOException {
DeployUtil.overrideJobJarLocations();
- // DeployUtil.initCliWorkDir();
- // DeployUtil.deployMetadata();
+ // DeployUtil.initCliWorkDir();
+ // DeployUtil.deployMetadata();
}
public static void beforeClass() throws Exception {
@@ -274,16 +301,31 @@ public class BuildCubeWithStream {
ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA);
if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
- throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2");
+ throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169");
}
HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
}
public void after() {
kafkaServer.stop();
+ cleanKafkaZkPath(kafkaZkPath);
DefaultScheduler.destroyInstance();
}
+ private void cleanKafkaZkPath(String path) {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ CuratorFramework zkClient = CuratorFrameworkFactory.newClient(ZookeeperUtil.getZKConnectString(), retryPolicy);
+ zkClient.start();
+
+ try {
+ zkClient.delete().deletingChildrenIfNeeded().forPath(kafkaZkPath);
+ } catch (Exception e) {
+ logger.warn("Failed to delete zookeeper path: " + path, e);
+ } finally {
+ zkClient.close();
+ }
+ }
+
protected void waitForJob(String jobId) {
while (true) {
AbstractExecutable job = jobService.getJob(jobId);
@@ -311,6 +353,9 @@ public class BuildCubeWithStream {
}
public static void main(String[] args) throws Exception {
+ long start = System.currentTimeMillis();
+ int exitCode = 0;
+
BuildCubeWithStream buildCubeWithStream = null;
try {
beforeClass();
@@ -318,13 +363,18 @@ public class BuildCubeWithStream {
buildCubeWithStream.before();
buildCubeWithStream.build();
logger.info("Build is done");
+
+ buildCubeWithStream.after();
buildCubeWithStream.cleanup();
logger.info("Going to exit");
- System.exit(0);
} catch (Throwable e) {
logger.error("error", e);
- System.exit(1);
+ exitCode = 1;
}
+ long millis = System.currentTimeMillis() - start;
+ System.out.println("Time elapsed: " + (millis / 1000) + " sec - in " + BuildCubeWithStream.class.getName());
+
+ System.exit(exitCode);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/0c6441c7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
new file mode 100644
index 0000000..b5ebe89
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.util.Arrays;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+public class ZookeeperUtil {
+
+ /**
+ * Get zookeeper connection string from HBase Configuration
+ *
+ * @return Zookeeper Connection string
+ */
+ public static String getZKConnectString() {
+ Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+ final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
+ final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+ return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+ @Nullable
+ @Override
+ public String apply(String input) {
+ return input + ":" + port;
+ }
+ }), ",");
+ }
+}