You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/26 14:14:15 UTC
[kylin] branch realtime-streaming updated: KYLIN-3768 Save
streaming metadata a standard kylin path in zookeeper
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch realtime-streaming
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/realtime-streaming by this push:
new c137bc8 KYLIN-3768 Save streaming metadata a standard kylin path in zookeeper
c137bc8 is described below
commit c137bc80a8bdf49f41a5214541ca42e5ff4cc6ef
Author: chao long <wa...@qq.com>
AuthorDate: Tue Mar 26 18:12:22 2019 +0800
KYLIN-3768 Save streaming metadata a standard kylin path in zookeeper
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 --
.../java/org/apache/kylin/common/util/ZKUtil.java | 8 ++-
.../kylin/provision/BuildCubeWithStream.java | 2 +-
.../kylin/realtime/BuildCubeWithStreamV2.java | 14 +----
.../kylin/stream/coordinator/Coordinator.java | 6 +-
.../coordinator/StreamMetadataStoreFactory.java | 4 +-
.../kylin/stream/coordinator/StreamingUtils.java | 34 ++++++++++
.../apache/kylin/stream/coordinator/ZKUtils.java | 72 ----------------------
.../coordinator/ZookeeperStreamMetadataStore.java | 6 +-
.../kylin/stream/coordinator/CoordinatorTest.java | 7 ++-
.../stream/server/ReplicaSetLeaderSelector.java | 4 +-
.../kylin/stream/server/StreamingServer.java | 4 +-
12 files changed, 59 insertions(+), 106 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 26dc711..ea182d8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2052,10 +2052,6 @@ public abstract class KylinConfigBase implements Serializable {
return getOptional("kylin.stream.metadata.store.type", "zk");
}
- public String getStreamingCoordinateZK() {
- return getOptional("kylin.stream.zookeeper", null);
- }
-
public String getStreamingSegmentRetentionPolicy() {
return getOptional("kylin.stream.segment.retention.policy", "fullBuild");
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java
index 5e032f4..5e02a19 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java
@@ -106,6 +106,10 @@ public class ZKUtil {
return zkString;
}
+ public static String getZkRootBasedPath(String path) {
+ return zkChRoot + "/" + path;
+ }
+
public static CuratorFramework getZookeeperClient(KylinConfig config) {
RetryPolicy retryPolicy = getRetryPolicy(config);
return getZookeeperClient(getZKConnectString(config), retryPolicy);
@@ -215,15 +219,13 @@ public class ZKUtil {
}), ",");
}
-
-
public static void cleanZkPath(String path) {
CuratorFramework zkClient = ZKUtil.newZookeeperClient();
try {
zkClient.delete().deletingChildrenIfNeeded().forPath(path);
} catch (Exception e) {
- logger.warn("Failed to delete zookeeper path: " + path, e);
+ logger.warn("Failed to delete zookeeper path: {}", path, e);
} finally {
zkClient.close();
}
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index ef0316c..f09825b 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -88,7 +88,7 @@ public class BuildCubeWithStream {
private KafkaConfig kafkaConfig;
private MockKafka kafkaServer;
private ZkConnection zkConnection;
- private final String kafkaZkPath = "/kylin/streaming/" + RandomUtil.randomUUID().toString();
+ private final String kafkaZkPath = ZKUtil.getZkRootBasedPath("streaming") + "/" + RandomUtil.randomUUID().toString();
protected static boolean fastBuildMode = false;
private volatile boolean generateData = true;
private volatile boolean generateDataDone = false;
diff --git a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
index 94d459d..1c4a934 100644
--- a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
+++ b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
@@ -53,7 +53,7 @@ import org.apache.kylin.query.KylinTestBase;
import org.apache.kylin.rest.job.StorageCleanupJob;
import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
import org.apache.kylin.stream.coordinator.Coordinator;
-import org.apache.kylin.stream.coordinator.ZKUtils;
+import org.apache.kylin.stream.coordinator.StreamingUtils;
import org.apache.kylin.stream.core.client.ReceiverAdminClient;
import org.apache.kylin.stream.core.consumer.ConsumerStartMode;
import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
@@ -81,7 +81,7 @@ public class BuildCubeWithStreamV2 extends KylinTestBase {
private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamV2.class);
private static final String CUBE_NAME = "test_streaming_v2_user_info_cube";
- private final String kafkaZkPath = "/kylin/streamingv2/" + RandomUtil.randomUUID().toString();
+ private final String kafkaZkPath = ZKUtil.getZkRootBasedPath("streamingv2") + "/" + RandomUtil.randomUUID().toString();
private final String messageFile = "src/test/resources/streaming_v2_user_info_messages.txt";
private String topicName;
@@ -152,20 +152,12 @@ public class BuildCubeWithStreamV2 extends KylinTestBase {
deployEnv();
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-// ExecutableManager jobService = ExecutableManager.getInstance(kylinConfig);
scheduler = DefaultScheduler.createInstance();
scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
if (!scheduler.hasStarted()) {
throw new RuntimeException("scheduler has not been started");
}
-// for (String jobId : jobService.getAllJobIds()) {
-// AbstractExecutable executable = jobService.getJob(jobId);
-// if (executable instanceof CubingJob || executable instanceof CheckpointExecutable) {
-// jobService.deleteJob(jobId);
-// }
-// }
-
final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
final String streamingTableName = cubeInstance.getRootFactTable();
final StreamingSourceConfig sourceConfig = StreamingSourceConfigManager.getInstance(kylinConfig).getConfig(streamingTableName);
@@ -355,7 +347,7 @@ public class BuildCubeWithStreamV2 extends KylinTestBase {
}
public static void cleanStreamZkRoot() {
- ZKUtil.cleanZkPath(ZKUtils.ZK_ROOT);
+ ZKUtil.cleanZkPath(StreamingUtils.STREAM_ZK_ROOT);
}
public static void main(String[] args) {
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
index 1e0750e..66a9c01 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
@@ -141,7 +141,7 @@ public class Coordinator implements CoordinatorClient {
this.streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
this.receiverAdminClient = new HttpReceiverAdminClient();
this.assigner = getAssigner();
- this.zkClient = ZKUtils.getZookeeperClient();
+ this.zkClient = StreamingUtils.getZookeeperClient();
this.selector = new CoordinatorLeaderSelector();
this.jobStatusChecker = new StreamingBuildJobStatusChecker();
this.streamingJobCheckExecutor = Executors.newScheduledThreadPool(1,
@@ -156,7 +156,7 @@ public class Coordinator implements CoordinatorClient {
this.streamMetadataStore = metadataStore;
this.receiverAdminClient = receiverClient;
this.assigner = getAssigner();
- this.zkClient = ZKUtils.getZookeeperClient();
+ this.zkClient = StreamingUtils.getZookeeperClient();
this.selector = new CoordinatorLeaderSelector();
this.jobStatusChecker = new StreamingBuildJobStatusChecker();
this.streamingJobCheckExecutor = Executors.newScheduledThreadPool(1,
@@ -1273,7 +1273,7 @@ public class Coordinator implements CoordinatorClient {
private LeaderSelector leaderSelector;
public CoordinatorLeaderSelector() {
- String path = ZKUtils.COORDINATOR_LEAD;
+ String path = StreamingUtils.COORDINATOR_LEAD;
leaderSelector = new LeaderSelector(zkClient, path, this);
leaderSelector.autoRequeue();
}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStoreFactory.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStoreFactory.java
index a2a20de..ac63659 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStoreFactory.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStoreFactory.java
@@ -18,7 +18,6 @@
package org.apache.kylin.stream.coordinator;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.kylin.common.KylinConfig;
public class StreamMetadataStoreFactory {
@@ -45,8 +44,7 @@ public class StreamMetadataStoreFactory {
}
public static StreamMetadataStore getZKStreamMetaDataStore() {
- CuratorFramework client = ZKUtils.getZookeeperClient();
- StreamMetadataStore store = new ZookeeperStreamMetadataStore(client);
+ StreamMetadataStore store = new ZookeeperStreamMetadataStore();
return store;
}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamingUtils.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamingUtils.java
new file mode 100644
index 0000000..3500543
--- /dev/null
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamingUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.stream.coordinator;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ZKUtil;
+
+
+public class StreamingUtils {
+ public static final String STREAM_ZK_ROOT = "/stream";
+ public static final String COORDINATOR_LEAD = STREAM_ZK_ROOT + "/coordinator";
+ public static final String REPLICASETS_LEADER_ELECT = STREAM_ZK_ROOT + "/replica_sets_lead";
+
+ public static CuratorFramework getZookeeperClient() {
+ return ZKUtil.getZookeeperClient(KylinConfig.getInstanceFromEnv());
+ }
+}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZKUtils.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZKUtils.java
deleted file mode 100644
index f21f478..0000000
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZKUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.stream.coordinator;
-
-import java.util.Arrays;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
-public class ZKUtils {
- public static final String ZK_ROOT = KylinConfig.getInstanceFromEnv().getZookeeperBasePath() + "/stream/" + KylinConfig.getInstanceFromEnv().getDeployEnv();
- public static final String COORDINATOR_LEAD = ZK_ROOT + "/coordinator";
- public static final String REPLICASETS_LEADER_ELECT = ZK_ROOT + "/replica_sets_lead";
- private static final Logger logger = LoggerFactory.getLogger(ZKUtils.class);
-
- public static CuratorFramework getZookeeperClient() {
- String zkString = KylinConfig.getInstanceFromEnv().getStreamingCoordinateZK();
- if (zkString == null) {
- zkString = getHBaseZKConnString();
- logger.info("streaming zk is not config, use hbase zookeeper:{}", zkString);
- }
- CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkString)
- .retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectionTimeoutMs(15 * 1000)
- .sessionTimeoutMs(60 * 1000).build();
- client.start();
- return client;
- }
-
- public static String getHBaseZKConnString() {
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
- final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
- return StringUtils.join(
- Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
- @Nullable
- @Override
- public String apply(String input) {
- return input + ":" + port;
- }
- }), ",");
- }
-
-}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
index 4cad0e5..2c3acb2 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
@@ -61,9 +61,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
private String cubeRoot;
private String coordinatorRoot;
- public ZookeeperStreamMetadataStore(CuratorFramework client) {
- this.client = client;
- this.zkRoot = ZKUtils.ZK_ROOT;
+ public ZookeeperStreamMetadataStore() {
+ this.client = StreamingUtils.getZookeeperClient();
+ this.zkRoot = StreamingUtils.STREAM_ZK_ROOT;
init();
}
diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java
index 70529a5..591ce8c 100644
--- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java
+++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java
@@ -21,6 +21,7 @@ package org.apache.kylin.stream.coordinator;
import com.google.common.collect.Lists;
import org.apache.curator.test.TestingServer;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.stream.coordinator.exception.ClusterStateException;
@@ -102,7 +103,7 @@ public class CoordinatorTest extends LocalFileMetadataTestCase {
staticCreateTestMetadata();
testingServer = new TestingServer(12181, false);
testingServer.start();
- System.setProperty("kylin.stream.zookeeper", "localhost:12181");
+ System.setProperty("kylin.env.zookeeper-connect-string", "localhost:12181");
metadataStore = StreamMetadataStoreFactory.getZKStreamMetaDataStore();
initZookeeperMetadataStore();
mockCube();
@@ -111,8 +112,10 @@ public class CoordinatorTest extends LocalFileMetadataTestCase {
@After
public void tearDown() throws Exception {
coordinator = null;
- System.clearProperty("kylin.stream.zookeeper");
+ ZKUtil.cleanZkPath(StreamingUtils.STREAM_ZK_ROOT);
+ StreamingUtils.getZookeeperClient().close();
testingServer.stop();// clear metadata
+ System.clearProperty("kylin.env.zookeeper-connect-string");
}
private void initZookeeperMetadataStore() {
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java
index 80a6adc..e7bdbde 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
-import org.apache.kylin.stream.coordinator.ZKUtils;
+import org.apache.kylin.stream.coordinator.StreamingUtils;
import org.apache.kylin.stream.core.model.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ public class ReplicaSetLeaderSelector extends LeaderSelectorListenerAdapter impl
public ReplicaSetLeaderSelector(CuratorFramework client, Node currNode, int replicaSetID) {
this.node = currNode;
this.replicaSetID = replicaSetID;
- String path = ZKUtils.REPLICASETS_LEADER_ELECT + "/" + replicaSetID;
+ String path = StreamingUtils.REPLICASETS_LEADER_ELECT + "/" + replicaSetID;
leaderSelector = new LeaderSelector(client, path, this);
leaderSelector.autoRequeue();
leaderChangeListeners = Lists.newArrayList();
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index 32b8a2b..01e4aa4 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -57,7 +57,7 @@ import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.stream.coordinator.StreamMetadataStore;
import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
-import org.apache.kylin.stream.coordinator.ZKUtils;
+import org.apache.kylin.stream.coordinator.StreamingUtils;
import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
import org.apache.kylin.stream.coordinator.client.HttpCoordinatorClient;
import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
@@ -121,7 +121,7 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis
private String baseStorePath;
private StreamingServer() {
- streamZKClient = ZKUtils.getZookeeperClient();
+ streamZKClient = StreamingUtils.getZookeeperClient();
streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
coordinatorClient = new HttpCoordinatorClient(streamMetadataStore);
currentNode = NodeUtil.getCurrentNode(DEFAULT_PORT);