You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/26 14:14:15 UTC

[kylin] branch realtime-streaming updated: KYLIN-3768 Save streaming metadata a standard kylin path in zookeeper

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch realtime-streaming
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/realtime-streaming by this push:
     new c137bc8  KYLIN-3768 Save streaming metadata a standard kylin path in zookeeper
c137bc8 is described below

commit c137bc80a8bdf49f41a5214541ca42e5ff4cc6ef
Author: chao long <wa...@qq.com>
AuthorDate: Tue Mar 26 18:12:22 2019 +0800

    KYLIN-3768 Save streaming metadata a standard kylin path in zookeeper
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 --
 .../java/org/apache/kylin/common/util/ZKUtil.java  |  8 ++-
 .../kylin/provision/BuildCubeWithStream.java       |  2 +-
 .../kylin/realtime/BuildCubeWithStreamV2.java      | 14 +----
 .../kylin/stream/coordinator/Coordinator.java      |  6 +-
 .../coordinator/StreamMetadataStoreFactory.java    |  4 +-
 .../kylin/stream/coordinator/StreamingUtils.java   | 34 ++++++++++
 .../apache/kylin/stream/coordinator/ZKUtils.java   | 72 ----------------------
 .../coordinator/ZookeeperStreamMetadataStore.java  |  6 +-
 .../kylin/stream/coordinator/CoordinatorTest.java  |  7 ++-
 .../stream/server/ReplicaSetLeaderSelector.java    |  4 +-
 .../kylin/stream/server/StreamingServer.java       |  4 +-
 12 files changed, 59 insertions(+), 106 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 26dc711..ea182d8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2052,10 +2052,6 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptional("kylin.stream.metadata.store.type", "zk");
     }
 
-    public String getStreamingCoordinateZK() {
-        return getOptional("kylin.stream.zookeeper", null);
-    }
-
     public String getStreamingSegmentRetentionPolicy() {
         return getOptional("kylin.stream.segment.retention.policy", "fullBuild");
     }
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java
index 5e032f4..5e02a19 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ZKUtil.java
@@ -106,6 +106,10 @@ public class ZKUtil {
         return zkString;
     }
 
+    public static String getZkRootBasedPath(String path) {
+        return zkChRoot + "/" + path;
+    }
+
     public static CuratorFramework getZookeeperClient(KylinConfig config) {
         RetryPolicy retryPolicy = getRetryPolicy(config);
         return getZookeeperClient(getZKConnectString(config), retryPolicy);
@@ -215,15 +219,13 @@ public class ZKUtil {
                 }), ",");
     }
 
-
-
     public static void cleanZkPath(String path) {
         CuratorFramework zkClient = ZKUtil.newZookeeperClient();
 
         try {
             zkClient.delete().deletingChildrenIfNeeded().forPath(path);
         } catch (Exception e) {
-            logger.warn("Failed to delete zookeeper path: " + path, e);
+            logger.warn("Failed to delete zookeeper path: {}", path, e);
         } finally {
             zkClient.close();
         }
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index ef0316c..f09825b 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -88,7 +88,7 @@ public class BuildCubeWithStream {
     private KafkaConfig kafkaConfig;
     private MockKafka kafkaServer;
     private ZkConnection zkConnection;
-    private final String kafkaZkPath = "/kylin/streaming/" + RandomUtil.randomUUID().toString();
+    private final String kafkaZkPath = ZKUtil.getZkRootBasedPath("streaming") + "/" + RandomUtil.randomUUID().toString();
     protected static boolean fastBuildMode = false;
     private volatile boolean generateData = true;
     private volatile boolean generateDataDone = false;
diff --git a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
index 94d459d..1c4a934 100644
--- a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
+++ b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
@@ -53,7 +53,7 @@ import org.apache.kylin.query.KylinTestBase;
 import org.apache.kylin.rest.job.StorageCleanupJob;
 import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
 import org.apache.kylin.stream.coordinator.Coordinator;
-import org.apache.kylin.stream.coordinator.ZKUtils;
+import org.apache.kylin.stream.coordinator.StreamingUtils;
 import org.apache.kylin.stream.core.client.ReceiverAdminClient;
 import org.apache.kylin.stream.core.consumer.ConsumerStartMode;
 import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
@@ -81,7 +81,7 @@ public class BuildCubeWithStreamV2 extends KylinTestBase {
     private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStreamV2.class);
 
     private static final String CUBE_NAME = "test_streaming_v2_user_info_cube";
-    private final String kafkaZkPath = "/kylin/streamingv2/" + RandomUtil.randomUUID().toString();
+    private final String kafkaZkPath = ZKUtil.getZkRootBasedPath("streamingv2") + "/" + RandomUtil.randomUUID().toString();
     private final String messageFile = "src/test/resources/streaming_v2_user_info_messages.txt";
 
     private String topicName;
@@ -152,20 +152,12 @@ public class BuildCubeWithStreamV2 extends KylinTestBase {
         deployEnv();
 
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-//        ExecutableManager jobService = ExecutableManager.getInstance(kylinConfig);
         scheduler = DefaultScheduler.createInstance();
         scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");
         }
 
-//        for (String jobId : jobService.getAllJobIds()) {
-//            AbstractExecutable executable = jobService.getJob(jobId);
-//            if (executable instanceof CubingJob || executable instanceof CheckpointExecutable) {
-//                jobService.deleteJob(jobId);
-//            }
-//        }
-
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
         final String streamingTableName = cubeInstance.getRootFactTable();
         final StreamingSourceConfig sourceConfig = StreamingSourceConfigManager.getInstance(kylinConfig).getConfig(streamingTableName);
@@ -355,7 +347,7 @@ public class BuildCubeWithStreamV2 extends KylinTestBase {
     }
 
     public static void cleanStreamZkRoot() {
-        ZKUtil.cleanZkPath(ZKUtils.ZK_ROOT);
+        ZKUtil.cleanZkPath(StreamingUtils.STREAM_ZK_ROOT);
     }
 
     public static void main(String[] args) {
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
index 1e0750e..66a9c01 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
@@ -141,7 +141,7 @@ public class Coordinator implements CoordinatorClient {
         this.streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
         this.receiverAdminClient = new HttpReceiverAdminClient();
         this.assigner = getAssigner();
-        this.zkClient = ZKUtils.getZookeeperClient();
+        this.zkClient = StreamingUtils.getZookeeperClient();
         this.selector = new CoordinatorLeaderSelector();
         this.jobStatusChecker = new StreamingBuildJobStatusChecker();
         this.streamingJobCheckExecutor = Executors.newScheduledThreadPool(1,
@@ -156,7 +156,7 @@ public class Coordinator implements CoordinatorClient {
         this.streamMetadataStore = metadataStore;
         this.receiverAdminClient = receiverClient;
         this.assigner = getAssigner();
-        this.zkClient = ZKUtils.getZookeeperClient();
+        this.zkClient = StreamingUtils.getZookeeperClient();
         this.selector = new CoordinatorLeaderSelector();
         this.jobStatusChecker = new StreamingBuildJobStatusChecker();
         this.streamingJobCheckExecutor = Executors.newScheduledThreadPool(1,
@@ -1273,7 +1273,7 @@ public class Coordinator implements CoordinatorClient {
         private LeaderSelector leaderSelector;
 
         public CoordinatorLeaderSelector() {
-            String path = ZKUtils.COORDINATOR_LEAD;
+            String path = StreamingUtils.COORDINATOR_LEAD;
             leaderSelector = new LeaderSelector(zkClient, path, this);
             leaderSelector.autoRequeue();
         }
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStoreFactory.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStoreFactory.java
index a2a20de..ac63659 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStoreFactory.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStoreFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.stream.coordinator;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.kylin.common.KylinConfig;
 
 public class StreamMetadataStoreFactory {
@@ -45,8 +44,7 @@ public class StreamMetadataStoreFactory {
     }
 
     public static StreamMetadataStore getZKStreamMetaDataStore() {
-        CuratorFramework client = ZKUtils.getZookeeperClient();
-        StreamMetadataStore store = new ZookeeperStreamMetadataStore(client);
+        StreamMetadataStore store = new ZookeeperStreamMetadataStore();
         return store;
     }
 
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamingUtils.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamingUtils.java
new file mode 100644
index 0000000..3500543
--- /dev/null
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamingUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.stream.coordinator;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ZKUtil;
+
+
+public class StreamingUtils {
+    public static final String STREAM_ZK_ROOT = "/stream";
+    public static final String COORDINATOR_LEAD = STREAM_ZK_ROOT + "/coordinator";
+    public static final String REPLICASETS_LEADER_ELECT = STREAM_ZK_ROOT + "/replica_sets_lead";
+
+    public static CuratorFramework getZookeeperClient() {
+        return ZKUtil.getZookeeperClient(KylinConfig.getInstanceFromEnv());
+    }
+}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZKUtils.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZKUtils.java
deleted file mode 100644
index f21f478..0000000
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZKUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.stream.coordinator;
-
-import java.util.Arrays;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
-public class ZKUtils {
-    public static final String ZK_ROOT = KylinConfig.getInstanceFromEnv().getZookeeperBasePath() + "/stream/" + KylinConfig.getInstanceFromEnv().getDeployEnv();
-    public static final String COORDINATOR_LEAD = ZK_ROOT + "/coordinator";
-    public static final String REPLICASETS_LEADER_ELECT = ZK_ROOT + "/replica_sets_lead";
-    private static final Logger logger = LoggerFactory.getLogger(ZKUtils.class);
-
-    public static CuratorFramework getZookeeperClient() {
-        String zkString = KylinConfig.getInstanceFromEnv().getStreamingCoordinateZK();
-        if (zkString == null) {
-            zkString = getHBaseZKConnString();
-            logger.info("streaming zk is not config, use hbase zookeeper:{}", zkString);
-        }
-        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkString)
-                .retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectionTimeoutMs(15 * 1000)
-                .sessionTimeoutMs(60 * 1000).build();
-        client.start();
-        return client;
-    }
-
-    public static String getHBaseZKConnString() {
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
-        final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-        return StringUtils.join(
-                Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
-                    @Nullable
-                    @Override
-                    public String apply(String input) {
-                        return input + ":" + port;
-                    }
-                }), ",");
-    }
-
-}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
index 4cad0e5..2c3acb2 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
@@ -61,9 +61,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
     private String cubeRoot;
     private String coordinatorRoot;
 
-    public ZookeeperStreamMetadataStore(CuratorFramework client) {
-        this.client = client;
-        this.zkRoot = ZKUtils.ZK_ROOT;
+    public ZookeeperStreamMetadataStore() {
+        this.client = StreamingUtils.getZookeeperClient();
+        this.zkRoot = StreamingUtils.STREAM_ZK_ROOT;
         init();
     }
 
diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java
index 70529a5..591ce8c 100644
--- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java
+++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/CoordinatorTest.java
@@ -21,6 +21,7 @@ package org.apache.kylin.stream.coordinator;
 import com.google.common.collect.Lists;
 import org.apache.curator.test.TestingServer;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.ZKUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.stream.coordinator.exception.ClusterStateException;
@@ -102,7 +103,7 @@ public class CoordinatorTest extends LocalFileMetadataTestCase {
         staticCreateTestMetadata();
         testingServer = new TestingServer(12181, false);
         testingServer.start();
-        System.setProperty("kylin.stream.zookeeper", "localhost:12181");
+        System.setProperty("kylin.env.zookeeper-connect-string", "localhost:12181");
         metadataStore = StreamMetadataStoreFactory.getZKStreamMetaDataStore();
         initZookeeperMetadataStore();
         mockCube();
@@ -111,8 +112,10 @@ public class CoordinatorTest extends LocalFileMetadataTestCase {
     @After
     public void tearDown() throws Exception {
         coordinator = null;
-        System.clearProperty("kylin.stream.zookeeper");
+        ZKUtil.cleanZkPath(StreamingUtils.STREAM_ZK_ROOT);
+        StreamingUtils.getZookeeperClient().close();
         testingServer.stop();// clear metadata
+        System.clearProperty("kylin.env.zookeeper-connect-string");
     }
 
     private void initZookeeperMetadataStore() {
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java
index 80a6adc..e7bdbde 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java
@@ -25,7 +25,7 @@ import java.util.List;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.leader.LeaderSelector;
 import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
-import org.apache.kylin.stream.coordinator.ZKUtils;
+import org.apache.kylin.stream.coordinator.StreamingUtils;
 import org.apache.kylin.stream.core.model.Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ public class ReplicaSetLeaderSelector extends LeaderSelectorListenerAdapter impl
     public ReplicaSetLeaderSelector(CuratorFramework client, Node currNode, int replicaSetID) {
         this.node = currNode;
         this.replicaSetID = replicaSetID;
-        String path = ZKUtils.REPLICASETS_LEADER_ELECT + "/" + replicaSetID;
+        String path = StreamingUtils.REPLICASETS_LEADER_ELECT + "/" + replicaSetID;
         leaderSelector = new LeaderSelector(client, path, this);
         leaderSelector.autoRequeue();
         leaderChangeListeners = Lists.newArrayList();
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index 32b8a2b..01e4aa4 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -57,7 +57,7 @@ import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.stream.coordinator.StreamMetadataStore;
 import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
-import org.apache.kylin.stream.coordinator.ZKUtils;
+import org.apache.kylin.stream.coordinator.StreamingUtils;
 import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
 import org.apache.kylin.stream.coordinator.client.HttpCoordinatorClient;
 import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
@@ -121,7 +121,7 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis
     private String baseStorePath;
 
     private StreamingServer() {
-        streamZKClient = ZKUtils.getZookeeperClient();
+        streamZKClient = StreamingUtils.getZookeeperClient();
         streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
         coordinatorClient = new HttpCoordinatorClient(streamMetadataStore);
         currentNode = NodeUtil.getCurrentNode(DEFAULT_PORT);