You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/08/01 23:12:42 UTC

[GitHub] [dolphinscheduler] ruanwenjun commented on a diff in pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

ruanwenjun commented on code in PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#discussion_r934871197


##########
dolphinscheduler-dist/release-docs/LICENSE:
##########
@@ -445,7 +445,17 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
     websocket-common 9.4.44.v20210927: https://mvnrepository.com/artifact/org.eclipse.jetty.websocket/websocket-common/9.4.44.v20210927, Apache 2.0
     zeppelin-client 0.10.1: https://mvnrepository.com/artifact/org.apache.zeppelin/zeppelin-client/0.10.1, Apache 2.0
     zeppelin-common 0.10.1: https://mvnrepository.com/artifact/org.apache.zeppelin/zeppelin-common/0.10.1, Apache 2.0
-
+    jraft-core 1.3.11: https://mvnrepository.com/artifact/com.alipay.sofa/jraft-core/1.3.11, Apache 2.0
+    jraft-rheakv-core 1.3.11: https://mvnrepository.com/artifact/com.alipay.sofa/jraft-rheakv-core/1.3.11, Apache 2.0
+    sofa-common-tools 1.0.12: https://mvnrepository.com/artifact/com.alipay.sofa.common/sofa-common-tools/1.0.12, Apache 2.0
+    bolt 1.6.4: https://mvnrepository.com/artifact/com.alipay.sofa/bolt/1.6.4, Apache 2.0
+    hessian 3.3.6: https://mvnrepository.com/artifact/com.alipay.sofa/hessian/3.3.6, Apache 2.0
+    affinity 3.1.7: https://mvnrepository.com/artifact/net.openhft/affinity/3.1.7, Apache 2.0
+    annotations 12.0: https://mvnrepository.com/artifact/com.intellij/annotations/12.0, Apache 2.0
+    disruptor 3.3.7: https://mvnrepository.com/artifact/com.lmax/disruptor/3.3.7, Apache 2.0
+    jctools-core 2.1.1: https://mvnrepository.com/artifact/org.jctools/jctools-core/2.1.1, Apache 2.0
+    metrics-core 4.1.26: https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core/4.1.26, Apache 2.0
+    rocksdbjni 6.22.1.1: https://search.maven.org/artifact/org.rocksdb/rocksdbjni/6.22.1.1, Apache 2.0

Review Comment:
   ```suggestion
       rocksdbjni 6.22.1.1: https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.22.1.1, Apache 2.0
   ```
   



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/README.md:
##########
@@ -0,0 +1,34 @@
+# Introduction
+
+This module is the RAFT consensus algorithm registry plugin module, this plugin will use raft cluster as the registry center.
+
+# How to use
+
+If you want to set the registry center as raft, 
+
+you need to set the registry properties in master/worker/api's appplication.yml,
+
+worker and api address add `/learner` suffix in `server-address-list`, indicates that they are not participate in the leader election.
+
+`please remember change the server-port in appplication.yml`.
+
+NOTE: In production environment, in order to achieve high availability, the master must be an odd number e.g 3 or 5.
+
+```yaml
+registry:
+  type: raft
+  cluster-name: dolphinscheduler
+  server-address-list: 127.0.0.1:8181,127.0.0.1:8182/learner,127.0.0.1:8183/learner
+  log-storage-dir: raft-data/
+  election-timeout: 1000ms
+  listener-check-interval: 2s
+  distributed-lock-timeout: 3s
+  server-address: 127.0.0.1
+  server-port: 8183
+  module: api
+  rpc-core-threads: 8
+  rpc-timeout-millis: 5000ms

Review Comment:
   Does these properties has default value?



##########
dolphinscheduler-master/pom.xml:
##########
@@ -225,6 +229,10 @@
                     <artifactId>jersey-core</artifactId>
                     <groupId>com.sun.jersey</groupId>
                 </exclusion>
+                <exclusion>

Review Comment:
   It's better to define the `protobuf-java` version in bom. Then you don't need to exclude it in everywhere.



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/README.md:
##########
@@ -0,0 +1,34 @@
+# Introduction
+
+This module is the RAFT consensus algorithm registry plugin module, this plugin will use raft cluster as the registry center.
+
+# How to use
+
+If you want to set the registry center as raft, 
+
+you need to set the registry properties in master/worker/api's appplication.yml,
+
+worker and api address add `/learner` suffix in `server-address-list`, indicates that they are not participate in the leader election.
+
+`please remember change the server-port in appplication.yml`.
+
+NOTE: In production environment, in order to achieve high availability, the master must be an odd number e.g 3 or 5.
+
+```yaml
+registry:
+  type: raft
+  cluster-name: dolphinscheduler
+  server-address-list: 127.0.0.1:8181,127.0.0.1:8182/learner,127.0.0.1:8183/learner

Review Comment:
   You can directly use list in yaml.
   ```suggestion
     server-address-list: 
           - 127.0.0.1:8181,
           - 127.0.0.1:8182/learner
           - 127.0.0.1:8183/learner
   ```



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistry.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.NodeManager;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.option.NodeOptions;
+import com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore;
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.options.PlacementDriverOptions;
+import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
+import com.alipay.sofa.jraft.rhea.options.RpcOptions;
+import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions;
+import com.alipay.sofa.jraft.rhea.options.configured.PlacementDriverOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.RheaKVStoreOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.StoreEngineOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.storage.KVEntry;
+import com.alipay.sofa.jraft.rhea.storage.StorageType;
+import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@Slf4j
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+public class RaftRegistry implements Registry {
+
+    private final Map<String, DistributedLock<byte[]>> distributedLockMap = new ConcurrentHashMap<>();
+
+    private final RheaKVStore kvStore;
+
+    private final RaftRegistryProperties properties;
+
+    private SubscribeListenerManager subscribeListenerManager;
+
+    private static final String REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS = "worker-groups";
+
+    private static final String API_TYPE = "api";
+
+    public RaftRegistry(RaftRegistryProperties properties) {
+        this.properties = properties;
+        //init RheaKVStore
+        final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
+                .withFake(true) // use a fake pd
+                .config();
+        NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions.setElectionTimeoutMs((int) properties.getElectionTimeout().toMillis());
+        final Endpoint serverAddress = new Endpoint(properties.getServerAddress(), properties.getServerPort());
+        final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured()
+                .withStorageType(StorageType.Memory)
+                .withRaftDataPath(properties.getLogStorageDir())
+                .withServerAddress(serverAddress)
+                .withCommonNodeOptions(nodeOptions)
+                .withKvRpcCoreThreads(properties.getRpcCoreThreads())
+                .config();
+        RpcOptions rpcOptions = new RpcOptions();
+        rpcOptions.setCallbackExecutorCorePoolSize(properties.getRpcCoreThreads());
+        rpcOptions.setRpcTimeoutMillis((int) properties.getRpcTimeoutMillis().toMillis());
+        final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured()
+                .withClusterName(properties.getClusterName())
+                .withUseParallelCompress(true)
+                .withInitialServerList(properties.getServerAddressList())
+                .withStoreEngineOptions(storeOpts)
+                .withPlacementDriverOptions(pdOpts)
+                .withRpcOptions(rpcOptions)
+                .config();
+        this.kvStore = new DefaultRheaKVStore();
+        this.kvStore.init(opts);
+        log.info("kvStore started...");
+        if (!properties.getModule().equalsIgnoreCase(API_TYPE)) {
+            this.subscribeListenerManager = new SubscribeListenerManager(properties, kvStore);
+            subscribeListenerManager.start();
+        }
+    }
+
+    @Override
+    public boolean subscribe(String path, SubscribeListener listener) {
+        return subscribeListenerManager.addSubscribeListener(path, listener);
+    }
+
+    @Override
+    public void unsubscribe(String path) {
+        subscribeListenerManager.removeSubscribeListener(path);
+    }
+
+    @Override
+    public void addConnectionStateListener(ConnectionListener listener) {
+        final String groupId = properties.getClusterName() + "--1";
+        final Node node = NodeManager.getInstance().get(groupId, new PeerId((properties.getServerAddress()), properties.getServerPort()));
+        node.addReplicatorStateListener(new RaftConnectionStateListener(listener));
+    }
+
+    @Override
+    public String get(String key) {
+        return readUtf8(kvStore.bGet(key));
+    }
+
+    @Override
+    public void put(String key, String value, boolean deleteOnDisconnect) {
+        kvStore.bPut(key, writeUtf8(value));
+        if (key.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
+            addWorkerGroup(key);
+        }
+    }
+
+    private void addWorkerGroup(String key) {
+        List<String> workerGroupList = getWorkerGroups();
+        String workerGroup = key.substring(key.indexOf(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS)
+                + Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS.length() + 1, key.lastIndexOf(Constants.SINGLE_SLASH));
+        if (!workerGroupList.contains(workerGroup)) {
+            workerGroupList.add(workerGroup);
+            kvStore.bPut(REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS, writeUtf8(JSONUtils.toJsonString(workerGroupList)));
+        }
+    }
+
+    private List<String> getWorkerGroups() {
+        final String storedWorkerGroup = readUtf8(kvStore.bGet(REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS));
+        if (StringUtils.isEmpty(storedWorkerGroup)) {
+            return new ArrayList<>();
+        }
+        return new ArrayList<>(JSONUtils.toList(storedWorkerGroup, String.class));
+    }
+
+    @Override
+    public void delete(String key) {
+        kvStore.bDelete(key);
+        final DistributedLock<byte[]> distributedLock = distributedLockMap.get(key);
+        if (distributedLock != null) {
+            distributedLock.unlock();
+        }
+        distributedLockMap.remove(key);
+    }
+
+    @Override
+    public Collection<String> children(String key) {
+        List<String> children = new ArrayList<>();
+        if (key.equals(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS)) {
+            //get all worker groups
+            children = getWorkerGroups();
+
+        } else {
+            final List<KVEntry> result = kvStore.bScan(key, key + Constants.SINGLE_SLASH + Constants.RANDOM_STRING);
+            if (result.isEmpty()) {
+                return new ArrayList<>();
+            }
+            for (final KVEntry kv : result) {
+                final String entryKey = readUtf8(kv.getKey());
+                if (StringUtils.isEmpty(readUtf8(kv.getValue())) || StringUtils.isEmpty(entryKey)) {
+                    continue;
+                }
+                String child = entryKey.substring(entryKey.lastIndexOf(Constants.SINGLE_SLASH) + 1);
+                children.add(child);
+            }
+        }
+        children.sort(Comparator.reverseOrder());
+        return children;
+    }
+
+    @Override
+    public boolean exists(String key) {
+        return kvStore.bContainsKey(key);
+    }
+
+    @Override
+    public boolean acquireLock(String key) {
+        final DistributedLock<byte[]> distributedLock = kvStore.getDistributedLock(key, properties.getDistributedLockTimeout().toMillis(), TimeUnit.MILLISECONDS);
+        final boolean lock = distributedLock.tryLock();

Review Comment:
   Does this method will return false? In fact the `acquireLock` need to block until get lock.



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryProperties.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import java.time.Duration;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import lombok.Data;
+
+@Data
+@Configuration
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+@ConfigurationProperties(prefix = "registry")
+public class RaftRegistryProperties {
+    private String clusterName;
+    private String serverAddressList;

Review Comment:
   Do we need to add validator for this class?



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryProperties.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import java.time.Duration;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import lombok.Data;
+
+@Data
+@Configuration
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+@ConfigurationProperties(prefix = "registry")
+public class RaftRegistryProperties {
+    private String clusterName;
+    private String serverAddressList;
+    private String serverAddress;
+    private int serverPort;
+    private String logStorageDir;
+    private Duration distributedLockTimeout = Duration.ofSeconds(3);
+    private Duration electionTimeout = Duration.ofMillis(1000);
+    private Duration listenerCheckInterval = Duration.ofSeconds(2);
+    private String module = "master";
+    private int rpcCoreThreads = 8;

Review Comment:
   ```suggestion
       private int rpcCoreThreads = Runtime.getRuntime().availableProcessors().;
   ```



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/SubscribeListenerManager.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.storage.KVEntry;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class SubscribeListenerManager implements AutoCloseable {
+
+    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();
+
+    private final RaftRegistryProperties properties;
+
+    private final RheaKVStore kvStore;
+
+    public SubscribeListenerManager(RaftRegistryProperties properties, RheaKVStore kvStore) {
+        this.properties = properties;
+        this.kvStore = kvStore;
+    }
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
+            1,
+            new ThreadFactoryBuilder().setNameFormat("SubscribeListenerCheckThread").setDaemon(true).build());
+
+    public void start() {
+        scheduledExecutorService.scheduleWithFixedDelay(new SubscribeCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        dataSubScribeMap.clear();
+        scheduledExecutorService.shutdown();
+    }
+
+    public boolean addSubscribeListener(String path, SubscribeListener listener) {
+        return dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>()).add(listener);
+    }
+
+    public void removeSubscribeListener(String path) {
+        dataSubScribeMap.remove(path);
+    }
+
+    private class SubscribeCheckTask implements Runnable {
+
+        private final Map<String, String> nodeDataMap = new ConcurrentHashMap<>();
+
+        @Override
+        public void run() {

Review Comment:
   We may need to make sure this method will not throw exception, otherwise the scheduleThreadPool will not execute task again.



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistry.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.NodeManager;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.option.NodeOptions;
+import com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore;
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.options.PlacementDriverOptions;
+import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
+import com.alipay.sofa.jraft.rhea.options.RpcOptions;
+import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions;
+import com.alipay.sofa.jraft.rhea.options.configured.PlacementDriverOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.RheaKVStoreOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.StoreEngineOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.storage.KVEntry;
+import com.alipay.sofa.jraft.rhea.storage.StorageType;
+import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@Slf4j
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+public class RaftRegistry implements Registry {
+
+    private final Map<String, DistributedLock<byte[]>> distributedLockMap = new ConcurrentHashMap<>();
+
+    private final RheaKVStore kvStore;
+
+    private final RaftRegistryProperties properties;
+
+    private SubscribeListenerManager subscribeListenerManager;
+
+    private static final String REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS = "worker-groups";
+
+    private static final String API_TYPE = "api";
+
+    public RaftRegistry(RaftRegistryProperties properties) {
+        this.properties = properties;
+        //init RheaKVStore
+        final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
+                .withFake(true) // use a fake pd
+                .config();
+        NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions.setElectionTimeoutMs((int) properties.getElectionTimeout().toMillis());
+        final Endpoint serverAddress = new Endpoint(properties.getServerAddress(), properties.getServerPort());
+        final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured()
+                .withStorageType(StorageType.Memory)
+                .withRaftDataPath(properties.getLogStorageDir())
+                .withServerAddress(serverAddress)
+                .withCommonNodeOptions(nodeOptions)
+                .withKvRpcCoreThreads(properties.getRpcCoreThreads())
+                .config();
+        RpcOptions rpcOptions = new RpcOptions();
+        rpcOptions.setCallbackExecutorCorePoolSize(properties.getRpcCoreThreads());
+        rpcOptions.setRpcTimeoutMillis((int) properties.getRpcTimeoutMillis().toMillis());
+        final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured()
+                .withClusterName(properties.getClusterName())
+                .withUseParallelCompress(true)
+                .withInitialServerList(properties.getServerAddressList())
+                .withStoreEngineOptions(storeOpts)
+                .withPlacementDriverOptions(pdOpts)
+                .withRpcOptions(rpcOptions)
+                .config();
+        this.kvStore = new DefaultRheaKVStore();
+        this.kvStore.init(opts);
+        log.info("kvStore started...");
+        if (!properties.getModule().equalsIgnoreCase(API_TYPE)) {
+            this.subscribeListenerManager = new SubscribeListenerManager(properties, kvStore);
+            subscribeListenerManager.start();
+        }
+    }
+
+    @Override
+    public boolean subscribe(String path, SubscribeListener listener) {
+        return subscribeListenerManager.addSubscribeListener(path, listener);
+    }
+
+    @Override
+    public void unsubscribe(String path) {
+        subscribeListenerManager.removeSubscribeListener(path);
+    }
+
+    @Override
+    public void addConnectionStateListener(ConnectionListener listener) {
+        final String groupId = properties.getClusterName() + "--1";
+        final Node node = NodeManager.getInstance().get(groupId, new PeerId((properties.getServerAddress()), properties.getServerPort()));
+        node.addReplicatorStateListener(new RaftConnectionStateListener(listener));
+    }
+
+    @Override
+    public String get(String key) {
+        return readUtf8(kvStore.bGet(key));
+    }
+
+    @Override
+    public void put(String key, String value, boolean deleteOnDisconnect) {
+        kvStore.bPut(key, writeUtf8(value));
+        if (key.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
+            addWorkerGroup(key);
+        }
+    }
+
+    private void addWorkerGroup(String key) {
+        List<String> workerGroupList = getWorkerGroups();
+        String workerGroup = key.substring(key.indexOf(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS)
+                + Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS.length() + 1, key.lastIndexOf(Constants.SINGLE_SLASH));
+        if (!workerGroupList.contains(workerGroup)) {
+            workerGroupList.add(workerGroup);
+            kvStore.bPut(REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS, writeUtf8(JSONUtils.toJsonString(workerGroupList)));
+        }
+    }
+
+    private List<String> getWorkerGroups() {
+        final String storedWorkerGroup = readUtf8(kvStore.bGet(REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS));
+        if (StringUtils.isEmpty(storedWorkerGroup)) {
+            return new ArrayList<>();
+        }
+        return new ArrayList<>(JSONUtils.toList(storedWorkerGroup, String.class));
+    }
+
+    @Override
+    public void delete(String key) {
+        kvStore.bDelete(key);
+        final DistributedLock<byte[]> distributedLock = distributedLockMap.get(key);
+        if (distributedLock != null) {
+            distributedLock.unlock();
+        }
+        distributedLockMap.remove(key);
+    }
+
+    @Override
+    public Collection<String> children(String key) {
+        List<String> children = new ArrayList<>();
+        if (key.equals(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS)) {

Review Comment:
   Why we need to do this judge here, this method is just want to get the children under given key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org