You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/07/25 16:08:09 UTC

[GitHub] [dolphinscheduler] zhuxt2015 opened a new pull request, #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

zhuxt2015 opened a new pull request, #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144

   <!--Thanks very much for contributing to Apache DolphinScheduler. Please review https://dolphinscheduler.apache.org/en-us/community/development/pull-request.html before opening a pull request.-->
   
   
   ## Purpose of the pull request
   fix #10874
   <!--(For example: This pull request adds checkstyle plugin).-->
   
   ## Brief change log
   
   <!--*(for example:)*
     - *Add maven-checkstyle-plugin to root pom.xml*
   -->
   ## Verify this pull request
   
   <!--*(Please pick either of the following options)*-->
   
   This pull request is code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   <!--*(example:)*
     - *Added dolphinscheduler-dao tests for end-to-end.*
     - *Added CronUtilsTest to verify the change.*
     - *Manually verified the change by testing locally.* -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1200327945

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11144)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL)
   
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list) [0.0% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] zhuxt2015 commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
zhuxt2015 commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1200449247

   > please add related license, refer to https://dolphinscheduler.apache.org/en-us/community/development/DS-License.html
   
   Done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] zhuxt2015 commented on a diff in pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
zhuxt2015 commented on code in PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#discussion_r935497195


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/README.md:
##########
@@ -0,0 +1,34 @@
+# Introduction
+
+This module is the RAFT consensus algorithm registry plugin module, this plugin will use raft cluster as the registry center.
+
+# How to use
+
+If you want to set the registry center as raft, 
+
+you need to set the registry properties in master/worker/api's appplication.yml,
+
+worker and api address add `/learner` suffix in `server-address-list`, indicates that they are not participate in the leader election.
+
+`please remember change the server-port in appplication.yml`.
+
+NOTE: In production environment, in order to achieve high availability, the master must be an odd number e.g 3 or 5.
+
+```yaml
+registry:
+  type: raft
+  cluster-name: dolphinscheduler
+  server-address-list: 127.0.0.1:8181,127.0.0.1:8182/learner,127.0.0.1:8183/learner
+  log-storage-dir: raft-data/
+  election-timeout: 1000ms
+  listener-check-interval: 2s
+  distributed-lock-timeout: 3s
+  server-address: 127.0.0.1
+  server-port: 8183
+  module: api
+  rpc-core-threads: 8
+  rpc-timeout-millis: 5000ms

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] zhuxt2015 commented on a diff in pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
zhuxt2015 commented on code in PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#discussion_r934004544


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistry.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import com.alipay.sofa.jraft.option.NodeOptions;
+import com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore;
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.options.PlacementDriverOptions;
+import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
+import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions;
+import com.alipay.sofa.jraft.rhea.options.configured.PlacementDriverOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.RheaKVStoreOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.RocksDBOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.StoreEngineOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.storage.StorageType;
+import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@Slf4j
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+public class RaftRegistry implements Registry {
+
+    private final Map<String, DistributedLock<byte[]>> distributedLockMap = new ConcurrentHashMap<>();
+
+    private RheaKVStore kvStore;
+
+    private RaftRegistryProperties properties;
+
+    private EphemeralNodeManager ephemeralNodeManager;
+
+    public RaftRegistry(RaftRegistryProperties properties) {
+        this.properties = properties;
+        //init RheaKVStore
+        final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
+                .withFake(true) // use a fake pd
+                .config();
+        NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions.setElectionTimeoutMs((int) properties.getElectionTimeout().toMillis());
+        nodeOptions.setSnapshotIntervalSecs((int) properties.getSnapshotInterval().getSeconds());
+        final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured()
+                .withStorageType(StorageType.RocksDB)
+                .withRocksDBOptions(RocksDBOptionsConfigured.newConfigured().withDbPath(properties.getDbStorageDir()).config())
+                .withRaftDataPath(properties.getLogStorageDir())
+                .withServerAddress(new Endpoint(properties.getServerAddress(), properties.getServerPort()))
+                .withCommonNodeOptions(nodeOptions)
+                .config();
+        final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured()
+                .withClusterName(properties.getClusterName())
+                .withUseParallelCompress(true)
+                .withInitialServerList(properties.getServerAddressList())
+                .withStoreEngineOptions(storeOpts)
+                .withPlacementDriverOptions(pdOpts)
+                .config();
+        this.kvStore = new DefaultRheaKVStore();
+        this.kvStore.init(opts);
+        log.info("kvStore started...");
+        this.ephemeralNodeManager = new EphemeralNodeManager(properties, kvStore);
+    }
+
+    @PostConstruct
+    public void start() {
+        ephemeralNodeManager.start();
+    }
+
+    @Override
+    public boolean subscribe(String path, SubscribeListener listener) {
+        return ephemeralNodeManager.addSubscribeListener(path, listener);
+    }
+
+    @Override
+    public void unsubscribe(String path) {
+        ephemeralNodeManager.removeSubscribeListener(path);
+    }
+
+    @Override
+    public void addConnectionStateListener(ConnectionListener listener) {
+        ephemeralNodeManager.addConnectionListener(listener);
+    }
+
+    @Override
+    public String get(String key) {
+        return readUtf8(kvStore.bGet(key));
+    }
+
+    @Override
+    public void put(String key, String value, boolean deleteOnDisconnect) {
+        if (StringUtils.isBlank(value)) {
+            return;
+        }
+        readUtf8(kvStore.bGetAndPut(key, writeUtf8(value)));
+        ephemeralNodeManager.putHandler(key, value);
+    }
+
+    @Override
+    public void delete(String key) {
+        kvStore.bDelete(key);
+        final DistributedLock<byte[]> distributedLock = distributedLockMap.get(key);
+        if (distributedLock != null) {
+            distributedLock.unlock();
+        }
+        distributedLockMap.remove(key);
+        ephemeralNodeManager.deleteHandler(key);
+
+    }
+
+    @Override
+    public Collection<String> children(String key) {
+        final String result = readUtf8(kvStore.bGet(key));
+        if (StringUtils.isEmpty(result)) {
+            return new ArrayList<>();
+        }
+        final List<String> children = JSONUtils.toList(result, String.class);
+        children.sort(Comparator.reverseOrder());
+        return children;
+    }
+
+    @Override
+    public boolean exists(String key) {
+        return kvStore.bContainsKey(key);
+    }
+
+    @Override
+    public boolean acquireLock(String key) {
+        final DistributedLock<byte[]> distributedLock = kvStore.getDistributedLock(key, properties.getDistributedLockTimeout().toMillis(), TimeUnit.MILLISECONDS);
+        final boolean lock = distributedLock.tryLock();
+        if (lock) {
+            distributedLockMap.put(key, distributedLock);
+        }
+        return lock;

Review Comment:
   not need



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency [dolphinscheduler]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1935134430

   This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 7 days if no further activity occurs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] leo-lao commented on a diff in pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
leo-lao commented on code in PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#discussion_r935333423


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftConnectionStateListener.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.core.Replicator;
+import com.alipay.sofa.jraft.entity.PeerId;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class RaftConnectionStateListener implements Replicator.ReplicatorStateListener {
+    private final ConnectionListener connectionListener;
+    private ConnectionState connectionState;
+
+    public RaftConnectionStateListener(ConnectionListener connectionListener) {
+        this.connectionListener = connectionListener;
+    }
+
+    @Override
+    public void onCreated(PeerId peerId) {
+        log.info("{}:{} created...", peerId.getIp(), peerId.getPort());
+    }
+
+    @Override
+    public void onError(PeerId peerId, Status status) {
+        log.error("{}:{} an error occurred, {}", peerId.getIp(), peerId.getPort(), status.getErrorMsg());
+    }
+
+    @Override
+    public void onDestroyed(PeerId peerId) {
+        log.info("{}:{} destroyed...", peerId.getIp(), peerId.getPort());
+    }
+
+    @Override
+    public void stateChanged(PeerId peer, ReplicatorState newState) {
+        switch (newState) {

Review Comment:
   Is it possible to get state change event of other peers?
   If so you cannot grant `connectionState` values from `newState` directly.



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftConnectionStateListener.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.core.Replicator;
+import com.alipay.sofa.jraft.entity.PeerId;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class RaftConnectionStateListener implements Replicator.ReplicatorStateListener {
+    private final ConnectionListener connectionListener;
+    private ConnectionState connectionState;
+
+    public RaftConnectionStateListener(ConnectionListener connectionListener) {
+        this.connectionListener = connectionListener;
+    }
+
+    @Override
+    public void onCreated(PeerId peerId) {
+        log.info("{}:{} created...", peerId.getIp(), peerId.getPort());
+    }
+
+    @Override
+    public void onError(PeerId peerId, Status status) {
+        log.error("{}:{} an error occurred, {}", peerId.getIp(), peerId.getPort(), status.getErrorMsg());
+    }
+
+    @Override
+    public void onDestroyed(PeerId peerId) {
+        log.info("{}:{} destroyed...", peerId.getIp(), peerId.getPort());
+    }
+
+    @Override
+    public void stateChanged(PeerId peer, ReplicatorState newState) {
+        switch (newState) {
+            case CREATED:
+                connectionState = ConnectionState.CONNECTED;
+                break;
+            case ONLINE:
+                if (connectionState == ConnectionState.DISCONNECTED || connectionState == ConnectionState.SUSPENDED) {
+                    connectionState = ConnectionState.RECONNECTED;
+                }
+                break;
+            case OFFLINE:
+                connectionState = ConnectionState.SUSPENDED;
+                break;
+            case DESTROYED:
+                connectionState = ConnectionState.DISCONNECTED;
+                break;
+            default:
+        }
+        connectionListener.onUpdate(connectionState);
+    }

Review Comment:
   Here I am confused by `stateChanged` method and still I cannot figure out how  statuses of raft node switch and how the master path on KVStore gets removed.
   
   Suppose Peer A(master) is down, and it is evicted from the raft cluster, then its `stateChanged` method gets invoked, but here only connectionState changed, then `MasterConnectionStateListener` invoked.
   till now I haven't seen any logic about how the path on KVStore got deleted. 
   
   I don't think the path of dead master will be deleted automaticaly unless you make the path on KVStore `one object with TTL` feature, that is, something like temporary nodes on Zookeeper.
   
   So to conclude, in order to get correct statemachine, there are two methods:
   1. `one object with TTL` to represent the master status(refreshed by HeatBeat)
   2. periodic checking logic(like MySQL registry)
   
   But In Raft Registry I see none of above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1201319769

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11144)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL)
   
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list) [0.0% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] caishunfeng commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
caishunfeng commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1221783616

   Hi @zhuxt2015 please check the review comment and resolve conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1200453207

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11144)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [![E](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/E-16px.png 'E')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [1 Security Hotspot](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [6 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL)
   
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list) [0.0% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1200327843

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11144)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL)
   
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list) [0.0% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] zhuxt2015 commented on a diff in pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
zhuxt2015 commented on code in PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#discussion_r956717444


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftConnectionStateListener.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.core.Replicator;
+import com.alipay.sofa.jraft.entity.PeerId;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class RaftConnectionStateListener implements Replicator.ReplicatorStateListener {
+    private final ConnectionListener connectionListener;
+    private ConnectionState connectionState;
+
+    public RaftConnectionStateListener(ConnectionListener connectionListener) {
+        this.connectionListener = connectionListener;
+    }
+
+    @Override
+    public void onCreated(PeerId peerId) {
+        log.info("{}:{} created...", peerId.getIp(), peerId.getPort());
+    }
+
+    @Override
+    public void onError(PeerId peerId, Status status) {
+        log.error("{}:{} an error occurred, {}", peerId.getIp(), peerId.getPort(), status.getErrorMsg());
+    }
+
+    @Override
+    public void onDestroyed(PeerId peerId) {
+        log.info("{}:{} destroyed...", peerId.getIp(), peerId.getPort());
+    }
+
+    @Override
+    public void stateChanged(PeerId peer, ReplicatorState newState) {
+        switch (newState) {

Review Comment:
   not get other peers state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] zhuxt2015 commented on a diff in pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
zhuxt2015 commented on code in PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#discussion_r956712255


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistry.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.NodeManager;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.option.NodeOptions;
+import com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore;
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.options.PlacementDriverOptions;
+import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
+import com.alipay.sofa.jraft.rhea.options.RpcOptions;
+import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions;
+import com.alipay.sofa.jraft.rhea.options.configured.PlacementDriverOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.RheaKVStoreOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.StoreEngineOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.storage.KVEntry;
+import com.alipay.sofa.jraft.rhea.storage.StorageType;
+import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@Slf4j
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+public class RaftRegistry implements Registry {
+
+    private final Map<String, DistributedLock<byte[]>> distributedLockMap = new ConcurrentHashMap<>();
+
+    private final RheaKVStore kvStore;
+
+    private final RaftRegistryProperties properties;
+
+    private SubscribeListenerManager subscribeListenerManager;
+
+    private static final String REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS = "worker-groups";
+
+    private static final String API_TYPE = "api";
+
+    public RaftRegistry(RaftRegistryProperties properties) {
+        this.properties = properties;
+        //init RheaKVStore
+        final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
+                .withFake(true) // use a fake pd
+                .config();
+        NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions.setElectionTimeoutMs((int) properties.getElectionTimeout().toMillis());
+        final Endpoint serverAddress = new Endpoint(properties.getServerAddress(), properties.getServerPort());
+        final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured()
+                .withStorageType(StorageType.Memory)
+                .withRaftDataPath(properties.getLogStorageDir())
+                .withServerAddress(serverAddress)
+                .withCommonNodeOptions(nodeOptions)
+                .withKvRpcCoreThreads(properties.getRpcCoreThreads())
+                .config();
+        RpcOptions rpcOptions = new RpcOptions();
+        rpcOptions.setCallbackExecutorCorePoolSize(properties.getRpcCoreThreads());
+        rpcOptions.setRpcTimeoutMillis((int) properties.getRpcTimeoutMillis().toMillis());
+        final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured()
+                .withClusterName(properties.getClusterName())
+                .withUseParallelCompress(true)
+                .withInitialServerList(properties.getServerAddressList())
+                .withStoreEngineOptions(storeOpts)
+                .withPlacementDriverOptions(pdOpts)
+                .withRpcOptions(rpcOptions)
+                .config();
+        this.kvStore = new DefaultRheaKVStore();
+        this.kvStore.init(opts);
+        log.info("kvStore started...");
+        if (!properties.getModule().equalsIgnoreCase(API_TYPE)) {
+            this.subscribeListenerManager = new SubscribeListenerManager(properties, kvStore);
+            subscribeListenerManager.start();
+        }
+    }
+
+    @Override
+    public boolean subscribe(String path, SubscribeListener listener) {
+        return subscribeListenerManager.addSubscribeListener(path, listener);
+    }
+
+    @Override
+    public void unsubscribe(String path) {
+        subscribeListenerManager.removeSubscribeListener(path);
+    }
+
+    @Override
+    public void addConnectionStateListener(ConnectionListener listener) {
+        final String groupId = properties.getClusterName() + "--1";
+        final Node node = NodeManager.getInstance().get(groupId, new PeerId((properties.getServerAddress()), properties.getServerPort()));
+        node.addReplicatorStateListener(new RaftConnectionStateListener(listener));
+    }
+
+    @Override
+    public String get(String key) {
+        return readUtf8(kvStore.bGet(key));
+    }
+
+    @Override
+    public void put(String key, String value, boolean deleteOnDisconnect) {
+        kvStore.bPut(key, writeUtf8(value));
+        if (key.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
+            addWorkerGroup(key);
+        }
+    }
+
+    private void addWorkerGroup(String key) {
+        List<String> workerGroupList = getWorkerGroups();
+        String workerGroup = key.substring(key.indexOf(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS)
+                + Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS.length() + 1, key.lastIndexOf(Constants.SINGLE_SLASH));
+        if (!workerGroupList.contains(workerGroup)) {
+            workerGroupList.add(workerGroup);
+            kvStore.bPut(REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS, writeUtf8(JSONUtils.toJsonString(workerGroupList)));
+        }
+    }
+
+    private List<String> getWorkerGroups() {
+        final String storedWorkerGroup = readUtf8(kvStore.bGet(REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS));
+        if (StringUtils.isEmpty(storedWorkerGroup)) {
+            return new ArrayList<>();
+        }
+        return new ArrayList<>(JSONUtils.toList(storedWorkerGroup, String.class));
+    }
+
+    @Override
+    public void delete(String key) {
+        kvStore.bDelete(key);
+        final DistributedLock<byte[]> distributedLock = distributedLockMap.get(key);
+        if (distributedLock != null) {
+            distributedLock.unlock();
+        }
+        distributedLockMap.remove(key);
+    }
+
+    @Override
+    public Collection<String> children(String key) {
+        List<String> children = new ArrayList<>();
+        if (key.equals(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS)) {

Review Comment:
   Because jraft not have getchildren function, so store workgroups separately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] leo-lao commented on a diff in pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
leo-lao commented on code in PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#discussion_r930607744


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/EphemeralNodeManager.java:
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 1. EphemeralNodeRefreshThread check current master node connection and check ephemeral node expire time
+ * 2. maintain Map<String, Long> activeMasterServers
+ * 3. maintain Map<String, Long> activeWorkerServers
+ * 4. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS, List<String>> master servers
+ * 5. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS, List<String>> worker servers
+ * 6. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, List<String>> dead servers
+ */
+@Slf4j
+public class EphemeralNodeManager implements AutoCloseable {
+    private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>());
+
+    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();
+
+    private RaftRegistryProperties properties;
+
+
+    private RheaKVStore kvStore;
+
+    public EphemeralNodeManager(RaftRegistryProperties properties, RheaKVStore kvStore) {
+        this.properties = properties;
+        this.kvStore = kvStore;
+    }
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
+            2,
+            new ThreadFactoryBuilder().setNameFormat("EphemeralNodeRefreshThread").setDaemon(true).build());
+
+    public void start() {
+        scheduledExecutorService.scheduleWithFixedDelay(new ConnectionCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+        scheduledExecutorService.scheduleWithFixedDelay(new SubscribeCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        connectionListeners.clear();
+        dataSubScribeMap.clear();
+        scheduledExecutorService.shutdown();
+    }
+
+    public void addConnectionListener(ConnectionListener listener) {
+        connectionListeners.add(listener);
+    }
+
+    public boolean addSubscribeListener(String path, SubscribeListener listener) {
+        return dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>()).add(listener);
+    }
+
+    public void removeSubscribeListener(String path) {
+        dataSubScribeMap.remove(path);
+    }
+
+    private class ConnectionCheckTask implements Runnable {
+        private ConnectionState connectionState = null;
+
+        @Override
+        public void run() {
+            checkConnection();
+            checkActiveNode();
+        }
+
+        private void checkConnection() {
+            final String host = NetUtils.getHost();
+            if (getActiveMasterServers().keySet().stream().anyMatch(address -> address.split(Constants.COLON)[0].equals(host))) {
+                if (connectionState == null && !connectionListeners.isEmpty()) {
+                    triggerListener(ConnectionState.CONNECTED);
+                } else if (connectionState == ConnectionState.DISCONNECTED) {
+                    triggerListener(ConnectionState.RECONNECTED);
+                } else {
+                    triggerListener(ConnectionState.CONNECTED);
+                }
+                connectionState = ConnectionState.CONNECTED;
+            }
+        }
+
+        private void checkActiveNode() {
+            long expireTime = properties.getConnectionExpireFactor() * properties.getListenerCheckInterval().toMillis();
+            Map<String, Long> activeMasterServers = getActiveMasterServers();
+            for (Map.Entry<String, Long> entry : activeMasterServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeMasterServers.remove(nodeAddress);
+                    updateActiveMaster(activeMasterServers);
+                    addDeadServer(Constants.MASTER_TYPE, nodeAddress);
+                    if (nodeAddress.split(Constants.COLON)[0].equals(NetUtils.getHost())) {
+                        connectionState = ConnectionState.DISCONNECTED;
+                        triggerListener(ConnectionState.DISCONNECTED);
+                        removeNodeData(nodeAddress);
+                    }
+                    log.warn("Master server {} connect to raft cluster timeout, last heartbeat {}, timeout config {} ms",
+                            nodeAddress, convertTimeToString(entry.getValue()), expireTime);
+                }
+            }
+            Map<String, Long> activeWorkerServers = getActiveWorkerServers();
+            for (Map.Entry<String, Long> entry : activeWorkerServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeWorkerServers.remove(nodeAddress);
+                    updateActiveWorker(nodeAddress, activeWorkerServers);

Review Comment:
   The same [ too many write requests] issue as above



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/EphemeralNodeManager.java:
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 1. EphemeralNodeRefreshThread check current master node connection and check ephemeral node expire time
+ * 2. maintain Map<String, Long> activeMasterServers
+ * 3. maintain Map<String, Long> activeWorkerServers
+ * 4. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS, List<String>> master servers
+ * 5. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS, List<String>> worker servers
+ * 6. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, List<String>> dead servers
+ */
+@Slf4j
+public class EphemeralNodeManager implements AutoCloseable {
+    private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>());
+
+    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();
+
+    private RaftRegistryProperties properties;
+
+
+    private RheaKVStore kvStore;
+
+    public EphemeralNodeManager(RaftRegistryProperties properties, RheaKVStore kvStore) {
+        this.properties = properties;
+        this.kvStore = kvStore;
+    }
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
+            2,
+            new ThreadFactoryBuilder().setNameFormat("EphemeralNodeRefreshThread").setDaemon(true).build());
+
+    public void start() {
+        scheduledExecutorService.scheduleWithFixedDelay(new ConnectionCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+        scheduledExecutorService.scheduleWithFixedDelay(new SubscribeCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        connectionListeners.clear();
+        dataSubScribeMap.clear();
+        scheduledExecutorService.shutdown();
+    }
+
+    public void addConnectionListener(ConnectionListener listener) {
+        connectionListeners.add(listener);
+    }
+
+    public boolean addSubscribeListener(String path, SubscribeListener listener) {
+        return dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>()).add(listener);
+    }
+
+    public void removeSubscribeListener(String path) {
+        dataSubScribeMap.remove(path);
+    }
+
+    private class ConnectionCheckTask implements Runnable {
+        private ConnectionState connectionState = null;
+
+        @Override
+        public void run() {
+            checkConnection();
+            checkActiveNode();
+        }
+
+        private void checkConnection() {
+            final String host = NetUtils.getHost();
+            if (getActiveMasterServers().keySet().stream().anyMatch(address -> address.split(Constants.COLON)[0].equals(host))) {
+                if (connectionState == null && !connectionListeners.isEmpty()) {
+                    triggerListener(ConnectionState.CONNECTED);
+                } else if (connectionState == ConnectionState.DISCONNECTED) {
+                    triggerListener(ConnectionState.RECONNECTED);
+                } else {
+                    triggerListener(ConnectionState.CONNECTED);
+                }
+                connectionState = ConnectionState.CONNECTED;
+            }
+        }
+
+        private void checkActiveNode() {
+            long expireTime = properties.getConnectionExpireFactor() * properties.getListenerCheckInterval().toMillis();
+            Map<String, Long> activeMasterServers = getActiveMasterServers();
+            for (Map.Entry<String, Long> entry : activeMasterServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeMasterServers.remove(nodeAddress);
+                    updateActiveMaster(activeMasterServers);

Review Comment:
   Here there exists possible too many write requests to KVStore, this would cause low efficiency.
   I thank you can accumulate enough expired masters to remove, and finally update this to KVStore in batch, rather than do it in each iteration.



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistry.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import com.alipay.sofa.jraft.option.NodeOptions;
+import com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore;
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.options.PlacementDriverOptions;
+import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
+import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions;
+import com.alipay.sofa.jraft.rhea.options.configured.PlacementDriverOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.RheaKVStoreOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.RocksDBOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.StoreEngineOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.storage.StorageType;
+import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@Slf4j
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+public class RaftRegistry implements Registry {
+
+    private final Map<String, DistributedLock<byte[]>> distributedLockMap = new ConcurrentHashMap<>();
+
+    private RheaKVStore kvStore;
+
+    private RaftRegistryProperties properties;
+
+    private EphemeralNodeManager ephemeralNodeManager;
+
+    public RaftRegistry(RaftRegistryProperties properties) {
+        this.properties = properties;
+        //init RheaKVStore
+        final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
+                .withFake(true) // use a fake pd
+                .config();
+        NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions.setElectionTimeoutMs((int) properties.getElectionTimeout().toMillis());
+        nodeOptions.setSnapshotIntervalSecs((int) properties.getSnapshotInterval().getSeconds());
+        final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured()
+                .withStorageType(StorageType.RocksDB)
+                .withRocksDBOptions(RocksDBOptionsConfigured.newConfigured().withDbPath(properties.getDbStorageDir()).config())
+                .withRaftDataPath(properties.getLogStorageDir())
+                .withServerAddress(new Endpoint(properties.getServerAddress(), properties.getServerPort()))
+                .withCommonNodeOptions(nodeOptions)
+                .config();
+        final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured()
+                .withClusterName(properties.getClusterName())
+                .withUseParallelCompress(true)
+                .withInitialServerList(properties.getServerAddressList())
+                .withStoreEngineOptions(storeOpts)
+                .withPlacementDriverOptions(pdOpts)
+                .config();
+        this.kvStore = new DefaultRheaKVStore();
+        this.kvStore.init(opts);
+        log.info("kvStore started...");
+        this.ephemeralNodeManager = new EphemeralNodeManager(properties, kvStore);
+    }
+
+    @PostConstruct
+    public void start() {
+        ephemeralNodeManager.start();
+    }
+
+    @Override
+    public boolean subscribe(String path, SubscribeListener listener) {
+        return ephemeralNodeManager.addSubscribeListener(path, listener);
+    }
+
+    @Override
+    public void unsubscribe(String path) {
+        ephemeralNodeManager.removeSubscribeListener(path);
+    }
+
+    @Override
+    public void addConnectionStateListener(ConnectionListener listener) {
+        ephemeralNodeManager.addConnectionListener(listener);
+    }
+
+    @Override
+    public String get(String key) {
+        return readUtf8(kvStore.bGet(key));
+    }
+
+    @Override
+    public void put(String key, String value, boolean deleteOnDisconnect) {
+        if (StringUtils.isBlank(value)) {
+            return;
+        }
+        readUtf8(kvStore.bGetAndPut(key, writeUtf8(value)));
+        ephemeralNodeManager.putHandler(key, value);
+    }
+
+    @Override
+    public void delete(String key) {
+        kvStore.bDelete(key);
+        final DistributedLock<byte[]> distributedLock = distributedLockMap.get(key);
+        if (distributedLock != null) {
+            distributedLock.unlock();
+        }
+        distributedLockMap.remove(key);
+        ephemeralNodeManager.deleteHandler(key);
+
+    }
+
+    @Override
+    public Collection<String> children(String key) {
+        final String result = readUtf8(kvStore.bGet(key));
+        if (StringUtils.isEmpty(result)) {
+            return new ArrayList<>();
+        }
+        final List<String> children = JSONUtils.toList(result, String.class);
+        children.sort(Comparator.reverseOrder());
+        return children;
+    }
+
+    @Override
+    public boolean exists(String key) {
+        return kvStore.bContainsKey(key);
+    }
+
+    @Override
+    public boolean acquireLock(String key) {
+        final DistributedLock<byte[]> distributedLock = kvStore.getDistributedLock(key, properties.getDistributedLockTimeout().toMillis(), TimeUnit.MILLISECONDS);
+        final boolean lock = distributedLock.tryLock();
+        if (lock) {
+            distributedLockMap.put(key, distributedLock);
+        }
+        return lock;

Review Comment:
   Is it  better to catch `Unable to get lock exception`  and throw RegistryException manually?



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/EphemeralNodeManager.java:
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 1. EphemeralNodeRefreshThread check current master node connection and check ephemeral node expire time
+ * 2. maintain Map<String, Long> activeMasterServers
+ * 3. maintain Map<String, Long> activeWorkerServers
+ * 4. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS, List<String>> master servers
+ * 5. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS, List<String>> worker servers
+ * 6. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, List<String>> dead servers
+ */
+@Slf4j
+public class EphemeralNodeManager implements AutoCloseable {
+    private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>());
+
+    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();
+
+    private RaftRegistryProperties properties;
+
+
+    private RheaKVStore kvStore;
+
+    public EphemeralNodeManager(RaftRegistryProperties properties, RheaKVStore kvStore) {
+        this.properties = properties;
+        this.kvStore = kvStore;
+    }
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
+            2,
+            new ThreadFactoryBuilder().setNameFormat("EphemeralNodeRefreshThread").setDaemon(true).build());
+
+    public void start() {
+        scheduledExecutorService.scheduleWithFixedDelay(new ConnectionCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+        scheduledExecutorService.scheduleWithFixedDelay(new SubscribeCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        connectionListeners.clear();
+        dataSubScribeMap.clear();
+        scheduledExecutorService.shutdown();
+    }
+
+    public void addConnectionListener(ConnectionListener listener) {
+        connectionListeners.add(listener);
+    }
+
+    public boolean addSubscribeListener(String path, SubscribeListener listener) {
+        return dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>()).add(listener);
+    }
+
+    public void removeSubscribeListener(String path) {
+        dataSubScribeMap.remove(path);
+    }
+
+    private class ConnectionCheckTask implements Runnable {
+        private ConnectionState connectionState = null;
+
+        @Override
+        public void run() {
+            checkConnection();
+            checkActiveNode();
+        }
+
+        private void checkConnection() {
+            final String host = NetUtils.getHost();
+            if (getActiveMasterServers().keySet().stream().anyMatch(address -> address.split(Constants.COLON)[0].equals(host))) {
+                if (connectionState == null && !connectionListeners.isEmpty()) {
+                    triggerListener(ConnectionState.CONNECTED);
+                } else if (connectionState == ConnectionState.DISCONNECTED) {
+                    triggerListener(ConnectionState.RECONNECTED);
+                } else {
+                    triggerListener(ConnectionState.CONNECTED);
+                }
+                connectionState = ConnectionState.CONNECTED;
+            }
+        }
+
+        private void checkActiveNode() {
+            long expireTime = properties.getConnectionExpireFactor() * properties.getListenerCheckInterval().toMillis();
+            Map<String, Long> activeMasterServers = getActiveMasterServers();
+            for (Map.Entry<String, Long> entry : activeMasterServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeMasterServers.remove(nodeAddress);
+                    updateActiveMaster(activeMasterServers);
+                    addDeadServer(Constants.MASTER_TYPE, nodeAddress);
+                    if (nodeAddress.split(Constants.COLON)[0].equals(NetUtils.getHost())) {
+                        connectionState = ConnectionState.DISCONNECTED;
+                        triggerListener(ConnectionState.DISCONNECTED);
+                        removeNodeData(nodeAddress);
+                    }
+                    log.warn("Master server {} connect to raft cluster timeout, last heartbeat {}, timeout config {} ms",
+                            nodeAddress, convertTimeToString(entry.getValue()), expireTime);
+                }
+            }
+            Map<String, Long> activeWorkerServers = getActiveWorkerServers();
+            for (Map.Entry<String, Long> entry : activeWorkerServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeWorkerServers.remove(nodeAddress);
+                    updateActiveWorker(nodeAddress, activeWorkerServers);
+                    removeWorkerGroup(nodeAddress);
+                    addDeadServer(Constants.WORKER_TYPE, nodeAddress);
+                    removeNodeData(nodeAddress);
+                    log.warn("Worker server {} connect to raft cluster timeout, last heartbeat {}, timeout config {} ms",
+                            nodeAddress, convertTimeToString(entry.getValue()), expireTime);
+                }
+            }
+        }
+

Review Comment:
   Here I see `updateActiveMaster` and `updateActiveWorker` method, they will modify whole data under the key `/nodes/master` or `/nodes/worker`, rather than modify the specific record for expired masters found this time.
   And still, HeatBeat Task will also do like this.
   So there will be multiple threads in different hosts, trying to read and write the same key, which might cause collisions in distributed environment.
   
   My suggestion is : create one key for each master(thus 3 keys for 3 maters), rather than  1 key for 3 masters.
   
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] codecov-commenter commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1194889197

   # [Codecov](https://codecov.io/gh/apache/dolphinscheduler/pull/11144?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#11144](https://codecov.io/gh/apache/dolphinscheduler/pull/11144?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (cb892a7) into [dev](https://codecov.io/gh/apache/dolphinscheduler/commit/ccb76414686bf70d3ada7ad2d35fdc21c7b6f659?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ccb7641) will **increase** coverage by `0.15%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@             Coverage Diff              @@
   ##                dev   #11144      +/-   ##
   ============================================
   + Coverage     40.34%   40.49%   +0.15%     
   - Complexity     4874     4881       +7     
   ============================================
     Files           950      950              
     Lines         37186    37178       -8     
     Branches       4078     4073       -5     
   ============================================
   + Hits          15003    15056      +53     
   + Misses        20673    20596      -77     
   - Partials       1510     1526      +16     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/dolphinscheduler/pull/11144?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...er/master/dispatch/host/assign/RandomSelector.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11144/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1tYXN0ZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvc2VydmVyL21hc3Rlci9kaXNwYXRjaC9ob3N0L2Fzc2lnbi9SYW5kb21TZWxlY3Rvci5qYXZh) | `77.77% <0.00%> (-5.56%)` | :arrow_down: |
   | [...e/dolphinscheduler/remote/NettyRemotingClient.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11144/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1yZW1vdGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvcmVtb3RlL05ldHR5UmVtb3RpbmdDbGllbnQuamF2YQ==) | `50.00% <0.00%> (-2.78%)` | :arrow_down: |
   | [...dolphinscheduler/remote/future/ResponseFuture.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11144/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1yZW1vdGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvcmVtb3RlL2Z1dHVyZS9SZXNwb25zZUZ1dHVyZS5qYXZh) | `81.96% <0.00%> (-1.64%)` | :arrow_down: |
   | [...rver/master/runner/task/BlockingTaskProcessor.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11144/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1tYXN0ZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvc2VydmVyL21hc3Rlci9ydW5uZXIvdGFzay9CbG9ja2luZ1Rhc2tQcm9jZXNzb3IuamF2YQ==) | `75.60% <0.00%> (-0.30%)` | :arrow_down: |
   | [...eduler/api/service/impl/DataSourceServiceImpl.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11144/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvYXBpL3NlcnZpY2UvaW1wbC9EYXRhU291cmNlU2VydmljZUltcGwuamF2YQ==) | `28.47% <0.00%> (ø)` | |
   | [.../dolphinscheduler/plugin/task/datax/DataxTask.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11144/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci10YXNrLXBsdWdpbi9kb2xwaGluc2NoZWR1bGVyLXRhc2stZGF0YXgvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvcGx1Z2luL3Rhc2svZGF0YXgvRGF0YXhUYXNrLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...cheduler/plugin/task/api/ShellCommandExecutor.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11144/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci10YXNrLXBsdWdpbi9kb2xwaGluc2NoZWR1bGVyLXRhc2stYXBpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9kb2xwaGluc2NoZWR1bGVyL3BsdWdpbi90YXNrL2FwaS9TaGVsbENvbW1hbmRFeGVjdXRvci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...er/api/service/impl/TaskDefinitionServiceImpl.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11144/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvYXBpL3NlcnZpY2UvaW1wbC9UYXNrRGVmaW5pdGlvblNlcnZpY2VJbXBsLmphdmE=) | `24.01% <0.00%> (+0.05%)` | :arrow_up: |
   | [...hinscheduler/plugin/alert/wechat/WeChatSender.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11144/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hbGVydC9kb2xwaGluc2NoZWR1bGVyLWFsZXJ0LXBsdWdpbnMvZG9scGhpbnNjaGVkdWxlci1hbGVydC13ZWNoYXQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvcGx1Z2luL2FsZXJ0L3dlY2hhdC9XZUNoYXRTZW5kZXIuamF2YQ==) | `29.78% <0.00%> (+0.20%)` | :arrow_up: |
   | [...olphinscheduler/plugin/alert/email/MailSender.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11144/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hbGVydC9kb2xwaGluc2NoZWR1bGVyLWFsZXJ0LXBsdWdpbnMvZG9scGhpbnNjaGVkdWxlci1hbGVydC1lbWFpbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZG9scGhpbnNjaGVkdWxlci9wbHVnaW4vYWxlcnQvZW1haWwvTWFpbFNlbmRlci5qYXZh) | `51.38% <0.00%> (+0.35%)` | :arrow_up: |
   | ... and [3 more](https://codecov.io/gh/apache/dolphinscheduler/pull/11144/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   Help us with your feedback. Take ten seconds to tell us [how you rate us](https://about.codecov.io/nps?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] davidzollo commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
davidzollo commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1194970221

   please add related license, refer to https://dolphinscheduler.apache.org/en-us/community/development/DS-License.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] leo-lao commented on a diff in pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
leo-lao commented on code in PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#discussion_r930607439


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/EphemeralNodeManager.java:
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 1. EphemeralNodeRefreshThread check current master node connection and check ephemeral node expire time
+ * 2. maintain Map<String, Long> activeMasterServers
+ * 3. maintain Map<String, Long> activeWorkerServers
+ * 4. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS, List<String>> master servers
+ * 5. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS, List<String>> worker servers
+ * 6. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, List<String>> dead servers
+ */
+@Slf4j
+public class EphemeralNodeManager implements AutoCloseable {
+    private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>());
+
+    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();
+
+    private RaftRegistryProperties properties;
+
+
+    private RheaKVStore kvStore;
+
+    public EphemeralNodeManager(RaftRegistryProperties properties, RheaKVStore kvStore) {
+        this.properties = properties;
+        this.kvStore = kvStore;
+    }
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
+            2,
+            new ThreadFactoryBuilder().setNameFormat("EphemeralNodeRefreshThread").setDaemon(true).build());
+
+    public void start() {
+        scheduledExecutorService.scheduleWithFixedDelay(new ConnectionCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+        scheduledExecutorService.scheduleWithFixedDelay(new SubscribeCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        connectionListeners.clear();
+        dataSubScribeMap.clear();
+        scheduledExecutorService.shutdown();
+    }
+
+    public void addConnectionListener(ConnectionListener listener) {
+        connectionListeners.add(listener);
+    }
+
+    public boolean addSubscribeListener(String path, SubscribeListener listener) {
+        return dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>()).add(listener);
+    }
+
+    public void removeSubscribeListener(String path) {
+        dataSubScribeMap.remove(path);
+    }
+
+    private class ConnectionCheckTask implements Runnable {
+        private ConnectionState connectionState = null;
+
+        @Override
+        public void run() {
+            checkConnection();
+            checkActiveNode();
+        }
+
+        private void checkConnection() {
+            final String host = NetUtils.getHost();
+            if (getActiveMasterServers().keySet().stream().anyMatch(address -> address.split(Constants.COLON)[0].equals(host))) {
+                if (connectionState == null && !connectionListeners.isEmpty()) {
+                    triggerListener(ConnectionState.CONNECTED);
+                } else if (connectionState == ConnectionState.DISCONNECTED) {
+                    triggerListener(ConnectionState.RECONNECTED);
+                } else {
+                    triggerListener(ConnectionState.CONNECTED);
+                }
+                connectionState = ConnectionState.CONNECTED;
+            }
+        }
+
+        private void checkActiveNode() {
+            long expireTime = properties.getConnectionExpireFactor() * properties.getListenerCheckInterval().toMillis();
+            Map<String, Long> activeMasterServers = getActiveMasterServers();
+            for (Map.Entry<String, Long> entry : activeMasterServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeMasterServers.remove(nodeAddress);
+                    updateActiveMaster(activeMasterServers);

Review Comment:
   Here there exists possible too many write requests to KVStore, this would cause low efficiency.
   I think you can accumulate enough expired masters to remove, and finally update this to KVStore in batch, rather than do it in each iteration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] zhuxt2015 commented on a diff in pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
zhuxt2015 commented on code in PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#discussion_r956751629


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/README.md:
##########
@@ -0,0 +1,34 @@
+# Introduction
+
+This module is the RAFT consensus algorithm registry plugin module, this plugin will use raft cluster as the registry center.
+
+# How to use
+
+If you want to set the registry center as raft, 
+
+you need to set the registry properties in master/worker/api's appplication.yml,
+
+worker and api address add `/learner` suffix in `server-address-list`, indicates that they are not participate in the leader election.
+
+`please remember change the server-port in appplication.yml`.
+
+NOTE: In production environment, in order to achieve high availability, the master must be an odd number e.g 3 or 5.
+
+```yaml
+registry:
+  type: raft
+  cluster-name: dolphinscheduler
+  server-address-list: 127.0.0.1:8181,127.0.0.1:8182/learner,127.0.0.1:8183/learner

Review Comment:
   can not use this style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] caishunfeng commented on a diff in pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
caishunfeng commented on code in PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#discussion_r930805662


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/EphemeralNodeManager.java:
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 1. EphemeralNodeRefreshThread check current master node connection and check ephemeral node expire time
+ * 2. maintain Map<String, Long> activeMasterServers
+ * 3. maintain Map<String, Long> activeWorkerServers
+ * 4. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS, List<String>> master servers
+ * 5. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS, List<String>> worker servers
+ * 6. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, List<String>> dead servers
+ */
+@Slf4j
+public class EphemeralNodeManager implements AutoCloseable {
+    private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>());
+
+    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();
+
+    private RaftRegistryProperties properties;
+
+
+    private RheaKVStore kvStore;
+
+    public EphemeralNodeManager(RaftRegistryProperties properties, RheaKVStore kvStore) {
+        this.properties = properties;
+        this.kvStore = kvStore;
+    }
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
+            2,
+            new ThreadFactoryBuilder().setNameFormat("EphemeralNodeRefreshThread").setDaemon(true).build());
+
+    public void start() {
+        scheduledExecutorService.scheduleWithFixedDelay(new ConnectionCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+        scheduledExecutorService.scheduleWithFixedDelay(new SubscribeCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        connectionListeners.clear();
+        dataSubScribeMap.clear();
+        scheduledExecutorService.shutdown();
+    }
+
+    public void addConnectionListener(ConnectionListener listener) {
+        connectionListeners.add(listener);
+    }
+
+    public boolean addSubscribeListener(String path, SubscribeListener listener) {
+        return dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>()).add(listener);
+    }
+
+    public void removeSubscribeListener(String path) {
+        dataSubScribeMap.remove(path);
+    }
+
+    private class ConnectionCheckTask implements Runnable {
+        private ConnectionState connectionState = null;
+
+        @Override
+        public void run() {
+            checkConnection();
+            checkActiveNode();
+        }
+
+        private void checkConnection() {
+            final String host = NetUtils.getHost();
+            if (getActiveMasterServers().keySet().stream().anyMatch(address -> address.split(Constants.COLON)[0].equals(host))) {
+                if (connectionState == null && !connectionListeners.isEmpty()) {
+                    triggerListener(ConnectionState.CONNECTED);
+                } else if (connectionState == ConnectionState.DISCONNECTED) {
+                    triggerListener(ConnectionState.RECONNECTED);
+                } else {
+                    triggerListener(ConnectionState.CONNECTED);
+                }
+                connectionState = ConnectionState.CONNECTED;
+            }
+        }
+
+        private void checkActiveNode() {
+            long expireTime = properties.getConnectionExpireFactor() * properties.getListenerCheckInterval().toMillis();
+            Map<String, Long> activeMasterServers = getActiveMasterServers();
+            for (Map.Entry<String, Long> entry : activeMasterServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeMasterServers.remove(nodeAddress);
+                    updateActiveMaster(activeMasterServers);
+                    addDeadServer(Constants.MASTER_TYPE, nodeAddress);
+                    if (nodeAddress.split(Constants.COLON)[0].equals(NetUtils.getHost())) {
+                        connectionState = ConnectionState.DISCONNECTED;
+                        triggerListener(ConnectionState.DISCONNECTED);
+                        removeNodeData(nodeAddress);
+                    }
+                    log.warn("Master server {} connect to raft cluster timeout, last heartbeat {}, timeout config {} ms",
+                            nodeAddress, convertTimeToString(entry.getValue()), expireTime);
+                }
+            }
+            Map<String, Long> activeWorkerServers = getActiveWorkerServers();
+            for (Map.Entry<String, Long> entry : activeWorkerServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeWorkerServers.remove(nodeAddress);
+                    updateActiveWorker(nodeAddress, activeWorkerServers);
+                    removeWorkerGroup(nodeAddress);
+                    addDeadServer(Constants.WORKER_TYPE, nodeAddress);
+                    removeNodeData(nodeAddress);
+                    log.warn("Worker server {} connect to raft cluster timeout, last heartbeat {}, timeout config {} ms",
+                            nodeAddress, convertTimeToString(entry.getValue()), expireTime);
+                }
+            }
+        }
+
+        private void triggerListener(ConnectionState connectionState) {
+            connectionListeners.forEach(listener -> listener.onUpdate(connectionState));
+        }
+    }
+
+    private class SubscribeCheckTask implements Runnable {
+
+        private final Map<String, String> nodeDataMap = new ConcurrentHashMap<>();
+
+        @Override
+        public void run() {
+            subscribeCheck();
+        }
+
+        private void subscribeCheck() {
+            if (dataSubScribeMap.isEmpty()) {
+                return;
+            }
+            final Map<String, String> currentNodeDataMap = getNodeDataMap();
+            // find the different
+            Map<String, String> addedData = new HashMap<>();
+            Map<String, String> deletedData = new HashMap<>();
+            Map<String, String> updatedData = new HashMap<>();
+            for (Map.Entry<String, String> entry : currentNodeDataMap.entrySet()) {
+                final String oldData = nodeDataMap.get(entry.getKey());
+                if (oldData == null) {
+                    addedData.put(entry.getKey(), entry.getValue());
+                } else {
+                    HeartBeat newHeartBeat = HeartBeat.decodeHeartBeat(entry.getValue());

Review Comment:
   I think registry should not care about the content, it should has its own judgment, which is independent of the node data content. WDYT?



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/EphemeralNodeManager.java:
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 1. EphemeralNodeRefreshThread check current master node connection and check ephemeral node expire time
+ * 2. maintain Map<String, Long> activeMasterServers
+ * 3. maintain Map<String, Long> activeWorkerServers
+ * 4. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS, List<String>> master servers
+ * 5. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS, List<String>> worker servers
+ * 6. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, List<String>> dead servers
+ */
+@Slf4j
+public class EphemeralNodeManager implements AutoCloseable {
+    private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>());
+
+    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();
+
+    private RaftRegistryProperties properties;
+
+
+    private RheaKVStore kvStore;
+
+    public EphemeralNodeManager(RaftRegistryProperties properties, RheaKVStore kvStore) {
+        this.properties = properties;
+        this.kvStore = kvStore;
+    }
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
+            2,
+            new ThreadFactoryBuilder().setNameFormat("EphemeralNodeRefreshThread").setDaemon(true).build());
+
+    public void start() {
+        scheduledExecutorService.scheduleWithFixedDelay(new ConnectionCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+        scheduledExecutorService.scheduleWithFixedDelay(new SubscribeCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        connectionListeners.clear();
+        dataSubScribeMap.clear();
+        scheduledExecutorService.shutdown();
+    }
+
+    public void addConnectionListener(ConnectionListener listener) {
+        connectionListeners.add(listener);
+    }
+
+    public boolean addSubscribeListener(String path, SubscribeListener listener) {
+        return dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>()).add(listener);
+    }
+
+    public void removeSubscribeListener(String path) {
+        dataSubScribeMap.remove(path);
+    }
+
+    private class ConnectionCheckTask implements Runnable {
+        private ConnectionState connectionState = null;
+
+        @Override
+        public void run() {
+            checkConnection();
+            checkActiveNode();
+        }
+
+        private void checkConnection() {
+            final String host = NetUtils.getHost();
+            if (getActiveMasterServers().keySet().stream().anyMatch(address -> address.split(Constants.COLON)[0].equals(host))) {
+                if (connectionState == null && !connectionListeners.isEmpty()) {
+                    triggerListener(ConnectionState.CONNECTED);
+                } else if (connectionState == ConnectionState.DISCONNECTED) {
+                    triggerListener(ConnectionState.RECONNECTED);
+                } else {
+                    triggerListener(ConnectionState.CONNECTED);
+                }
+                connectionState = ConnectionState.CONNECTED;
+            }
+        }
+
+        private void checkActiveNode() {
+            long expireTime = properties.getConnectionExpireFactor() * properties.getListenerCheckInterval().toMillis();
+            Map<String, Long> activeMasterServers = getActiveMasterServers();
+            for (Map.Entry<String, Long> entry : activeMasterServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeMasterServers.remove(nodeAddress);
+                    updateActiveMaster(activeMasterServers);
+                    addDeadServer(Constants.MASTER_TYPE, nodeAddress);
+                    if (nodeAddress.split(Constants.COLON)[0].equals(NetUtils.getHost())) {

Review Comment:
   It should add `host:port` because it is possible to deploy multiple master services on the same machine.



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/EphemeralNodeManager.java:
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 1. EphemeralNodeRefreshThread check current master node connection and check ephemeral node expire time
+ * 2. maintain Map<String, Long> activeMasterServers
+ * 3. maintain Map<String, Long> activeWorkerServers
+ * 4. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS, List<String>> master servers
+ * 5. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS, List<String>> worker servers
+ * 6. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, List<String>> dead servers
+ */
+@Slf4j
+public class EphemeralNodeManager implements AutoCloseable {
+    private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>());
+
+    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();
+
+    private RaftRegistryProperties properties;
+
+
+    private RheaKVStore kvStore;
+
+    public EphemeralNodeManager(RaftRegistryProperties properties, RheaKVStore kvStore) {
+        this.properties = properties;
+        this.kvStore = kvStore;
+    }
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
+            2,
+            new ThreadFactoryBuilder().setNameFormat("EphemeralNodeRefreshThread").setDaemon(true).build());
+
+    public void start() {
+        scheduledExecutorService.scheduleWithFixedDelay(new ConnectionCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+        scheduledExecutorService.scheduleWithFixedDelay(new SubscribeCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        connectionListeners.clear();
+        dataSubScribeMap.clear();
+        scheduledExecutorService.shutdown();
+    }
+
+    public void addConnectionListener(ConnectionListener listener) {
+        connectionListeners.add(listener);
+    }
+
+    public boolean addSubscribeListener(String path, SubscribeListener listener) {
+        return dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>()).add(listener);
+    }
+
+    public void removeSubscribeListener(String path) {
+        dataSubScribeMap.remove(path);
+    }
+
+    private class ConnectionCheckTask implements Runnable {
+        private ConnectionState connectionState = null;
+
+        @Override
+        public void run() {
+            checkConnection();
+            checkActiveNode();
+        }
+
+        private void checkConnection() {
+            final String host = NetUtils.getHost();
+            if (getActiveMasterServers().keySet().stream().anyMatch(address -> address.split(Constants.COLON)[0].equals(host))) {
+                if (connectionState == null && !connectionListeners.isEmpty()) {
+                    triggerListener(ConnectionState.CONNECTED);
+                } else if (connectionState == ConnectionState.DISCONNECTED) {
+                    triggerListener(ConnectionState.RECONNECTED);
+                } else {
+                    triggerListener(ConnectionState.CONNECTED);
+                }
+                connectionState = ConnectionState.CONNECTED;
+            }
+        }
+
+        private void checkActiveNode() {
+            long expireTime = properties.getConnectionExpireFactor() * properties.getListenerCheckInterval().toMillis();
+            Map<String, Long> activeMasterServers = getActiveMasterServers();
+            for (Map.Entry<String, Long> entry : activeMasterServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeMasterServers.remove(nodeAddress);
+                    updateActiveMaster(activeMasterServers);
+                    addDeadServer(Constants.MASTER_TYPE, nodeAddress);
+                    if (nodeAddress.split(Constants.COLON)[0].equals(NetUtils.getHost())) {
+                        connectionState = ConnectionState.DISCONNECTED;
+                        triggerListener(ConnectionState.DISCONNECTED);
+                        removeNodeData(nodeAddress);
+                    }
+                    log.warn("Master server {} connect to raft cluster timeout, last heartbeat {}, timeout config {} ms",
+                            nodeAddress, convertTimeToString(entry.getValue()), expireTime);
+                }
+            }
+            Map<String, Long> activeWorkerServers = getActiveWorkerServers();
+            for (Map.Entry<String, Long> entry : activeWorkerServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeWorkerServers.remove(nodeAddress);
+                    updateActiveWorker(nodeAddress, activeWorkerServers);
+                    removeWorkerGroup(nodeAddress);
+                    addDeadServer(Constants.WORKER_TYPE, nodeAddress);
+                    removeNodeData(nodeAddress);
+                    log.warn("Worker server {} connect to raft cluster timeout, last heartbeat {}, timeout config {} ms",
+                            nodeAddress, convertTimeToString(entry.getValue()), expireTime);
+                }
+            }
+        }
+
+        private void triggerListener(ConnectionState connectionState) {
+            connectionListeners.forEach(listener -> listener.onUpdate(connectionState));
+        }
+    }
+
+    private class SubscribeCheckTask implements Runnable {
+
+        private final Map<String, String> nodeDataMap = new ConcurrentHashMap<>();
+
+        @Override
+        public void run() {
+            subscribeCheck();
+        }
+
+        private void subscribeCheck() {
+            if (dataSubScribeMap.isEmpty()) {
+                return;
+            }
+            final Map<String, String> currentNodeDataMap = getNodeDataMap();
+            // find the different
+            Map<String, String> addedData = new HashMap<>();
+            Map<String, String> deletedData = new HashMap<>();
+            Map<String, String> updatedData = new HashMap<>();
+            for (Map.Entry<String, String> entry : currentNodeDataMap.entrySet()) {
+                final String oldData = nodeDataMap.get(entry.getKey());
+                if (oldData == null) {
+                    addedData.put(entry.getKey(), entry.getValue());
+                } else {
+                    HeartBeat newHeartBeat = HeartBeat.decodeHeartBeat(entry.getValue());
+                    HeartBeat oldHeartBeat = HeartBeat.decodeHeartBeat(oldData);
+                    if (newHeartBeat != null && newHeartBeat.getReportTime() != oldHeartBeat.getReportTime()) {
+                        updatedData.put(entry.getKey(), entry.getValue());
+                    }
+                }
+            }
+            for (Map.Entry<String, String> entry : nodeDataMap.entrySet()) {
+                if (!currentNodeDataMap.containsKey(entry.getKey())) {
+                    deletedData.put(entry.getKey(), entry.getValue());
+                }
+            }
+            nodeDataMap.clear();
+            nodeDataMap.putAll(currentNodeDataMap);
+            // trigger listener
+            for (Map.Entry<String, List<SubscribeListener>> entry : dataSubScribeMap.entrySet()) {
+                String subscribeKey = entry.getKey();
+                List<SubscribeListener> subscribeListeners = entry.getValue();
+                triggerListener(addedData, subscribeKey, subscribeListeners, Event.Type.ADD);
+                triggerListener(deletedData, subscribeKey, subscribeListeners, Event.Type.REMOVE);
+                triggerListener(updatedData, subscribeKey, subscribeListeners, Event.Type.UPDATE);
+            }
+
+        }
+
+        private void triggerListener(Map<String, String> nodeDataMap, String subscribeKey, List<SubscribeListener> subscribeListeners, Event.Type type) {
+            for (Map.Entry<String, String> entry : nodeDataMap.entrySet()) {
+                final String key = entry.getKey();
+                if (key.startsWith(subscribeKey)) {
+                    subscribeListeners.forEach(listener -> listener.notify(new Event(key, key, entry.getValue(), type)));
+                }
+            }
+        }
+    }
+
+    public static String convertTimeToString(Long time) {
+        DateTimeFormatter ftf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+        return ftf.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneId.systemDefault()));
+    }
+
+    public void putHandler(String key, String value) {
+        final String nodeAddress = key.substring(key.lastIndexOf(Constants.SINGLE_SLASH) + 1);
+        //update heart beat time and node set
+        if (key.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS)) {
+            Map<String, Long> activeMasterServers = getActiveMasterServers();
+            activeMasterServers.put(nodeAddress, System.currentTimeMillis());
+            updateActiveMaster(activeMasterServers);
+            removeDeadServer(Constants.MASTER_TYPE, nodeAddress);
+            addNodeData(key, value);
+        } else if (key.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS)) {
+            Map<String, Long> activeWorkerServers = getActiveWorkerServers();
+            activeWorkerServers.put(nodeAddress, System.currentTimeMillis());
+            updateActiveWorker(key, activeWorkerServers);
+            addWorkerGroup(key);
+            removeDeadServer(Constants.WORKER_TYPE, nodeAddress);
+            addNodeData(key, value);
+        } else if (key.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS)) {
+            final List<String> deadServers = getDeadServers();
+            if (!deadServers.contains(nodeAddress)) {
+                deadServers.add(nodeAddress);
+                kvStore.bPut(Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, writeUtf8(JSONUtils.toJsonString(deadServers)));
+            }
+        }
+
+    }
+
+    public void deleteHandler(String key) {
+        final String nodeAddress = key.substring(key.lastIndexOf(Constants.SINGLE_SLASH) + 1);
+        if (key.contains(Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS)) {
+            Map<String, Long> activeMasterServers = getActiveMasterServers();
+            activeMasterServers.remove(nodeAddress);
+            updateActiveMaster(activeMasterServers);
+            removeNodeData(nodeAddress);
+            log.info("Raft registry remove master server {}", nodeAddress);
+        } else if (key.contains(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS)) {
+            Map<String, Long> activeWorkerServers = getActiveWorkerServers();
+            activeWorkerServers.remove(nodeAddress);
+            updateActiveWorker(key, activeWorkerServers);
+            removeWorkerGroup(nodeAddress);
+            removeNodeData(nodeAddress);
+            log.info("Raft registry remove worker server {}", nodeAddress);
+        }
+    }
+
+    private void updateActiveMaster(Map<String, Long> activeNodes) {
+        kvStore.bPut(Constants.MASTER_TYPE, writeUtf8(JSONUtils.toJsonString(activeNodes)));
+        //Update the mapping of the master group and master node list
+        kvStore.bPut(Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS, writeUtf8(JSONUtils.toJsonString(activeNodes.keySet())));
+    }
+
+    private void updateActiveWorker(String key, Map<String, Long> activeNodes) {
+        kvStore.bPut(Constants.WORKER_TYPE, writeUtf8(JSONUtils.toJsonString(activeNodes)));
+        //Update the mapping of the worker group and worker node list
+        kvStore.bPut(key.substring(0, key.lastIndexOf(Constants.SINGLE_SLASH)), writeUtf8(JSONUtils.toJsonString(activeNodes.keySet())));
+    }
+
+    private void addDeadServer(String nodeType, String nodeAddress) {
+        final String deadServerAddress = getDeadServerSuffix(nodeType, nodeAddress);
+        List<String> deadServers = getDeadServers();
+        if (!deadServers.contains(deadServerAddress)) {
+            deadServers.add(deadServerAddress);
+            kvStore.bPut(Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, writeUtf8(JSONUtils.toJsonString(deadServers)));
+        }
+    }
+
+    private void removeDeadServer(String nodeType, String nodeAddress) {
+        final String deadServerAddress = getDeadServerSuffix(nodeType, nodeAddress);
+        List<String> deadServers = getDeadServers();
+        if (deadServers.contains(deadServerAddress)) {
+            deadServers.remove(deadServerAddress);
+            kvStore.bPut(Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, writeUtf8(JSONUtils.toJsonString(deadServers)));
+        }
+    }
+
+    /**
+     * @return IP:Port->TimeMillis
+     */
+    private Map<String, Long> getActiveWorkerServers() {
+        final String servers = readUtf8(kvStore.bGet(Constants.WORKER_TYPE));
+        if (StringUtils.isEmpty(servers)) {
+            return new HashMap<>();
+        }
+        return JSONUtils.toMap(servers, String.class, Long.class);
+    }
+
+    private List<String> getDeadServers() {
+        final String storedDeadServers = readUtf8(kvStore.bGet(Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS));
+        if (StringUtils.isEmpty(storedDeadServers)) {
+            return new ArrayList<>();
+        }
+        return new ArrayList<>(JSONUtils.toList(storedDeadServers, String.class));
+    }
+
+    /**
+     * @return IP:Port->TimeMillis
+     */
+    private Map<String, Long> getActiveMasterServers() {
+        final String storedMasterServers = readUtf8(kvStore.bGet(Constants.MASTER_TYPE));
+        if (StringUtils.isEmpty(storedMasterServers)) {
+            return new HashMap<>();
+        }
+        return JSONUtils.toMap(storedMasterServers, String.class, Long.class);
+    }
+
+    private String getDeadServerSuffix(String nodeType, String serverAddress) {
+        return nodeType + Constants.UNDERLINE + serverAddress;
+    }
+
+    private void addWorkerGroup(String key) {

Review Comment:
   `WorkerGroup` is belong to bussiness domain, and they are not part of the registry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] EricGao888 commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1336770169

   @zhuxt2015 This is a significant improvement. By using jRaft as registry plugin, it seems registry center will no more need to rely on external third-party components such as mysql, etcd, or zookeeper. May I ask whether you have time to follow up with this PR? Thanks : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1194897739

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11144)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [1 Code Smell](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL)
   
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list) [0.0% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency [dolphinscheduler]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency
URL: https://github.com/apache/dolphinscheduler/pull/11144


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1195250508

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11144)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [3 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL)
   
   [![12.5%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '12.5%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list) [12.5% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1195246259

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11144)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [3 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL)
   
   [![12.5%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '12.5%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list) [12.5% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] EricGao888 commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by "EricGao888 (via GitHub)" <gi...@apache.org>.
EricGao888 commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1537883788

   @zhuxt2015 Hi, would u like to continue with this work? If you are no more interested or do not have time for it, do u mind me helping with the rest?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1200454297

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11144)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [![E](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/E-16px.png 'E')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [1 Security Hotspot](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [6 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL)
   
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list) [0.0% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1201317190

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11144)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [0 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL)
   
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list) [0.0% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1194892833

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11144)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL) [1 Code Smell](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11144&resolved=false&types=CODE_SMELL)
   
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list) [0.0% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11144&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] ruanwenjun commented on a diff in pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#discussion_r934871197


##########
dolphinscheduler-dist/release-docs/LICENSE:
##########
@@ -445,7 +445,17 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
     websocket-common 9.4.44.v20210927: https://mvnrepository.com/artifact/org.eclipse.jetty.websocket/websocket-common/9.4.44.v20210927, Apache 2.0
     zeppelin-client 0.10.1: https://mvnrepository.com/artifact/org.apache.zeppelin/zeppelin-client/0.10.1, Apache 2.0
     zeppelin-common 0.10.1: https://mvnrepository.com/artifact/org.apache.zeppelin/zeppelin-common/0.10.1, Apache 2.0
-
+    jraft-core 1.3.11: https://mvnrepository.com/artifact/com.alipay.sofa/jraft-core/1.3.11, Apache 2.0
+    jraft-rheakv-core 1.3.11: https://mvnrepository.com/artifact/com.alipay.sofa/jraft-rheakv-core/1.3.11, Apache 2.0
+    sofa-common-tools 1.0.12: https://mvnrepository.com/artifact/com.alipay.sofa.common/sofa-common-tools/1.0.12, Apache 2.0
+    bolt 1.6.4: https://mvnrepository.com/artifact/com.alipay.sofa/bolt/1.6.4, Apache 2.0
+    hessian 3.3.6: https://mvnrepository.com/artifact/com.alipay.sofa/hessian/3.3.6, Apache 2.0
+    affinity 3.1.7: https://mvnrepository.com/artifact/net.openhft/affinity/3.1.7, Apache 2.0
+    annotations 12.0: https://mvnrepository.com/artifact/com.intellij/annotations/12.0, Apache 2.0
+    disruptor 3.3.7: https://mvnrepository.com/artifact/com.lmax/disruptor/3.3.7, Apache 2.0
+    jctools-core 2.1.1: https://mvnrepository.com/artifact/org.jctools/jctools-core/2.1.1, Apache 2.0
+    metrics-core 4.1.26: https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core/4.1.26, Apache 2.0
+    rocksdbjni 6.22.1.1: https://search.maven.org/artifact/org.rocksdb/rocksdbjni/6.22.1.1, Apache 2.0

Review Comment:
   ```suggestion
       rocksdbjni 6.22.1.1: https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.22.1.1, Apache 2.0
   ```
   



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/README.md:
##########
@@ -0,0 +1,34 @@
+# Introduction
+
+This module is the RAFT consensus algorithm registry plugin module, this plugin will use raft cluster as the registry center.
+
+# How to use
+
+If you want to set the registry center as raft, 
+
+you need to set the registry properties in master/worker/api's appplication.yml,
+
+worker and api address add `/learner` suffix in `server-address-list`, indicates that they are not participate in the leader election.
+
+`please remember change the server-port in appplication.yml`.
+
+NOTE: In production environment, in order to achieve high availability, the master must be an odd number e.g 3 or 5.
+
+```yaml
+registry:
+  type: raft
+  cluster-name: dolphinscheduler
+  server-address-list: 127.0.0.1:8181,127.0.0.1:8182/learner,127.0.0.1:8183/learner
+  log-storage-dir: raft-data/
+  election-timeout: 1000ms
+  listener-check-interval: 2s
+  distributed-lock-timeout: 3s
+  server-address: 127.0.0.1
+  server-port: 8183
+  module: api
+  rpc-core-threads: 8
+  rpc-timeout-millis: 5000ms

Review Comment:
   Does these properties has default value?



##########
dolphinscheduler-master/pom.xml:
##########
@@ -225,6 +229,10 @@
                     <artifactId>jersey-core</artifactId>
                     <groupId>com.sun.jersey</groupId>
                 </exclusion>
+                <exclusion>

Review Comment:
   It's better to define the `protobuf-java` version in bom. Then you don't need to exclude it in everywhere.



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/README.md:
##########
@@ -0,0 +1,34 @@
+# Introduction
+
+This module is the RAFT consensus algorithm registry plugin module, this plugin will use raft cluster as the registry center.
+
+# How to use
+
+If you want to set the registry center as raft, 
+
+you need to set the registry properties in master/worker/api's appplication.yml,
+
+worker and api address add `/learner` suffix in `server-address-list`, indicates that they are not participate in the leader election.
+
+`please remember change the server-port in appplication.yml`.
+
+NOTE: In production environment, in order to achieve high availability, the master must be an odd number e.g 3 or 5.
+
+```yaml
+registry:
+  type: raft
+  cluster-name: dolphinscheduler
+  server-address-list: 127.0.0.1:8181,127.0.0.1:8182/learner,127.0.0.1:8183/learner

Review Comment:
   You can directly use list in yaml.
   ```suggestion
     server-address-list: 
           - 127.0.0.1:8181,
           - 127.0.0.1:8182/learner
           - 127.0.0.1:8183/learner
   ```



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistry.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.NodeManager;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.option.NodeOptions;
+import com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore;
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.options.PlacementDriverOptions;
+import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
+import com.alipay.sofa.jraft.rhea.options.RpcOptions;
+import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions;
+import com.alipay.sofa.jraft.rhea.options.configured.PlacementDriverOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.RheaKVStoreOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.StoreEngineOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.storage.KVEntry;
+import com.alipay.sofa.jraft.rhea.storage.StorageType;
+import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@Slf4j
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+public class RaftRegistry implements Registry {
+
+    private final Map<String, DistributedLock<byte[]>> distributedLockMap = new ConcurrentHashMap<>();
+
+    private final RheaKVStore kvStore;
+
+    private final RaftRegistryProperties properties;
+
+    private SubscribeListenerManager subscribeListenerManager;
+
+    private static final String REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS = "worker-groups";
+
+    private static final String API_TYPE = "api";
+
+    public RaftRegistry(RaftRegistryProperties properties) {
+        this.properties = properties;
+        //init RheaKVStore
+        final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
+                .withFake(true) // use a fake pd
+                .config();
+        NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions.setElectionTimeoutMs((int) properties.getElectionTimeout().toMillis());
+        final Endpoint serverAddress = new Endpoint(properties.getServerAddress(), properties.getServerPort());
+        final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured()
+                .withStorageType(StorageType.Memory)
+                .withRaftDataPath(properties.getLogStorageDir())
+                .withServerAddress(serverAddress)
+                .withCommonNodeOptions(nodeOptions)
+                .withKvRpcCoreThreads(properties.getRpcCoreThreads())
+                .config();
+        RpcOptions rpcOptions = new RpcOptions();
+        rpcOptions.setCallbackExecutorCorePoolSize(properties.getRpcCoreThreads());
+        rpcOptions.setRpcTimeoutMillis((int) properties.getRpcTimeoutMillis().toMillis());
+        final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured()
+                .withClusterName(properties.getClusterName())
+                .withUseParallelCompress(true)
+                .withInitialServerList(properties.getServerAddressList())
+                .withStoreEngineOptions(storeOpts)
+                .withPlacementDriverOptions(pdOpts)
+                .withRpcOptions(rpcOptions)
+                .config();
+        this.kvStore = new DefaultRheaKVStore();
+        this.kvStore.init(opts);
+        log.info("kvStore started...");
+        if (!properties.getModule().equalsIgnoreCase(API_TYPE)) {
+            this.subscribeListenerManager = new SubscribeListenerManager(properties, kvStore);
+            subscribeListenerManager.start();
+        }
+    }
+
+    @Override
+    public boolean subscribe(String path, SubscribeListener listener) {
+        return subscribeListenerManager.addSubscribeListener(path, listener);
+    }
+
+    @Override
+    public void unsubscribe(String path) {
+        subscribeListenerManager.removeSubscribeListener(path);
+    }
+
+    @Override
+    public void addConnectionStateListener(ConnectionListener listener) {
+        final String groupId = properties.getClusterName() + "--1";
+        final Node node = NodeManager.getInstance().get(groupId, new PeerId((properties.getServerAddress()), properties.getServerPort()));
+        node.addReplicatorStateListener(new RaftConnectionStateListener(listener));
+    }
+
+    @Override
+    public String get(String key) {
+        return readUtf8(kvStore.bGet(key));
+    }
+
+    @Override
+    public void put(String key, String value, boolean deleteOnDisconnect) {
+        kvStore.bPut(key, writeUtf8(value));
+        if (key.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
+            addWorkerGroup(key);
+        }
+    }
+
+    private void addWorkerGroup(String key) {
+        List<String> workerGroupList = getWorkerGroups();
+        String workerGroup = key.substring(key.indexOf(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS)
+                + Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS.length() + 1, key.lastIndexOf(Constants.SINGLE_SLASH));
+        if (!workerGroupList.contains(workerGroup)) {
+            workerGroupList.add(workerGroup);
+            kvStore.bPut(REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS, writeUtf8(JSONUtils.toJsonString(workerGroupList)));
+        }
+    }
+
+    private List<String> getWorkerGroups() {
+        final String storedWorkerGroup = readUtf8(kvStore.bGet(REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS));
+        if (StringUtils.isEmpty(storedWorkerGroup)) {
+            return new ArrayList<>();
+        }
+        return new ArrayList<>(JSONUtils.toList(storedWorkerGroup, String.class));
+    }
+
+    @Override
+    public void delete(String key) {
+        kvStore.bDelete(key);
+        final DistributedLock<byte[]> distributedLock = distributedLockMap.get(key);
+        if (distributedLock != null) {
+            distributedLock.unlock();
+        }
+        distributedLockMap.remove(key);
+    }
+
+    @Override
+    public Collection<String> children(String key) {
+        List<String> children = new ArrayList<>();
+        if (key.equals(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS)) {
+            //get all worker groups
+            children = getWorkerGroups();
+
+        } else {
+            final List<KVEntry> result = kvStore.bScan(key, key + Constants.SINGLE_SLASH + Constants.RANDOM_STRING);
+            if (result.isEmpty()) {
+                return new ArrayList<>();
+            }
+            for (final KVEntry kv : result) {
+                final String entryKey = readUtf8(kv.getKey());
+                if (StringUtils.isEmpty(readUtf8(kv.getValue())) || StringUtils.isEmpty(entryKey)) {
+                    continue;
+                }
+                String child = entryKey.substring(entryKey.lastIndexOf(Constants.SINGLE_SLASH) + 1);
+                children.add(child);
+            }
+        }
+        children.sort(Comparator.reverseOrder());
+        return children;
+    }
+
+    @Override
+    public boolean exists(String key) {
+        return kvStore.bContainsKey(key);
+    }
+
+    @Override
+    public boolean acquireLock(String key) {
+        final DistributedLock<byte[]> distributedLock = kvStore.getDistributedLock(key, properties.getDistributedLockTimeout().toMillis(), TimeUnit.MILLISECONDS);
+        final boolean lock = distributedLock.tryLock();

Review Comment:
   Does this method will return false? In fact the `acquireLock` need to block until get lock.



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryProperties.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import java.time.Duration;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import lombok.Data;
+
+@Data
+@Configuration
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+@ConfigurationProperties(prefix = "registry")
+public class RaftRegistryProperties {
+    private String clusterName;
+    private String serverAddressList;

Review Comment:
   Do we need to add validator for this class?



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryProperties.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import java.time.Duration;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import lombok.Data;
+
+@Data
+@Configuration
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+@ConfigurationProperties(prefix = "registry")
+public class RaftRegistryProperties {
+    private String clusterName;
+    private String serverAddressList;
+    private String serverAddress;
+    private int serverPort;
+    private String logStorageDir;
+    private Duration distributedLockTimeout = Duration.ofSeconds(3);
+    private Duration electionTimeout = Duration.ofMillis(1000);
+    private Duration listenerCheckInterval = Duration.ofSeconds(2);
+    private String module = "master";
+    private int rpcCoreThreads = 8;

Review Comment:
   ```suggestion
       private int rpcCoreThreads = Runtime.getRuntime().availableProcessors().;
   ```



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/SubscribeListenerManager.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.storage.KVEntry;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class SubscribeListenerManager implements AutoCloseable {
+
+    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();
+
+    private final RaftRegistryProperties properties;
+
+    private final RheaKVStore kvStore;
+
+    public SubscribeListenerManager(RaftRegistryProperties properties, RheaKVStore kvStore) {
+        this.properties = properties;
+        this.kvStore = kvStore;
+    }
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
+            1,
+            new ThreadFactoryBuilder().setNameFormat("SubscribeListenerCheckThread").setDaemon(true).build());
+
+    public void start() {
+        scheduledExecutorService.scheduleWithFixedDelay(new SubscribeCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        dataSubScribeMap.clear();
+        scheduledExecutorService.shutdown();
+    }
+
+    public boolean addSubscribeListener(String path, SubscribeListener listener) {
+        return dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>()).add(listener);
+    }
+
+    public void removeSubscribeListener(String path) {
+        dataSubScribeMap.remove(path);
+    }
+
+    private class SubscribeCheckTask implements Runnable {
+
+        private final Map<String, String> nodeDataMap = new ConcurrentHashMap<>();
+
+        @Override
+        public void run() {

Review Comment:
   We may need to make sure this method will not throw exception, otherwise the scheduleThreadPool will not execute task again.



##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistry.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.NodeManager;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.option.NodeOptions;
+import com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore;
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.options.PlacementDriverOptions;
+import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
+import com.alipay.sofa.jraft.rhea.options.RpcOptions;
+import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions;
+import com.alipay.sofa.jraft.rhea.options.configured.PlacementDriverOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.RheaKVStoreOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.StoreEngineOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.storage.KVEntry;
+import com.alipay.sofa.jraft.rhea.storage.StorageType;
+import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@Slf4j
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+public class RaftRegistry implements Registry {
+
+    private final Map<String, DistributedLock<byte[]>> distributedLockMap = new ConcurrentHashMap<>();
+
+    private final RheaKVStore kvStore;
+
+    private final RaftRegistryProperties properties;
+
+    private SubscribeListenerManager subscribeListenerManager;
+
+    private static final String REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS = "worker-groups";
+
+    private static final String API_TYPE = "api";
+
+    public RaftRegistry(RaftRegistryProperties properties) {
+        this.properties = properties;
+        //init RheaKVStore
+        final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
+                .withFake(true) // use a fake pd
+                .config();
+        NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions.setElectionTimeoutMs((int) properties.getElectionTimeout().toMillis());
+        final Endpoint serverAddress = new Endpoint(properties.getServerAddress(), properties.getServerPort());
+        final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured()
+                .withStorageType(StorageType.Memory)
+                .withRaftDataPath(properties.getLogStorageDir())
+                .withServerAddress(serverAddress)
+                .withCommonNodeOptions(nodeOptions)
+                .withKvRpcCoreThreads(properties.getRpcCoreThreads())
+                .config();
+        RpcOptions rpcOptions = new RpcOptions();
+        rpcOptions.setCallbackExecutorCorePoolSize(properties.getRpcCoreThreads());
+        rpcOptions.setRpcTimeoutMillis((int) properties.getRpcTimeoutMillis().toMillis());
+        final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured()
+                .withClusterName(properties.getClusterName())
+                .withUseParallelCompress(true)
+                .withInitialServerList(properties.getServerAddressList())
+                .withStoreEngineOptions(storeOpts)
+                .withPlacementDriverOptions(pdOpts)
+                .withRpcOptions(rpcOptions)
+                .config();
+        this.kvStore = new DefaultRheaKVStore();
+        this.kvStore.init(opts);
+        log.info("kvStore started...");
+        if (!properties.getModule().equalsIgnoreCase(API_TYPE)) {
+            this.subscribeListenerManager = new SubscribeListenerManager(properties, kvStore);
+            subscribeListenerManager.start();
+        }
+    }
+
+    @Override
+    public boolean subscribe(String path, SubscribeListener listener) {
+        return subscribeListenerManager.addSubscribeListener(path, listener);
+    }
+
+    @Override
+    public void unsubscribe(String path) {
+        subscribeListenerManager.removeSubscribeListener(path);
+    }
+
+    @Override
+    public void addConnectionStateListener(ConnectionListener listener) {
+        final String groupId = properties.getClusterName() + "--1";
+        final Node node = NodeManager.getInstance().get(groupId, new PeerId((properties.getServerAddress()), properties.getServerPort()));
+        node.addReplicatorStateListener(new RaftConnectionStateListener(listener));
+    }
+
+    @Override
+    public String get(String key) {
+        return readUtf8(kvStore.bGet(key));
+    }
+
+    @Override
+    public void put(String key, String value, boolean deleteOnDisconnect) {
+        kvStore.bPut(key, writeUtf8(value));
+        if (key.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
+            addWorkerGroup(key);
+        }
+    }
+
+    private void addWorkerGroup(String key) {
+        List<String> workerGroupList = getWorkerGroups();
+        String workerGroup = key.substring(key.indexOf(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS)
+                + Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS.length() + 1, key.lastIndexOf(Constants.SINGLE_SLASH));
+        if (!workerGroupList.contains(workerGroup)) {
+            workerGroupList.add(workerGroup);
+            kvStore.bPut(REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS, writeUtf8(JSONUtils.toJsonString(workerGroupList)));
+        }
+    }
+
+    private List<String> getWorkerGroups() {
+        final String storedWorkerGroup = readUtf8(kvStore.bGet(REGISTRY_DOLPHINSCHEDULER_WORKER_GROUPS));
+        if (StringUtils.isEmpty(storedWorkerGroup)) {
+            return new ArrayList<>();
+        }
+        return new ArrayList<>(JSONUtils.toList(storedWorkerGroup, String.class));
+    }
+
+    @Override
+    public void delete(String key) {
+        kvStore.bDelete(key);
+        final DistributedLock<byte[]> distributedLock = distributedLockMap.get(key);
+        if (distributedLock != null) {
+            distributedLock.unlock();
+        }
+        distributedLockMap.remove(key);
+    }
+
+    @Override
+    public Collection<String> children(String key) {
+        List<String> children = new ArrayList<>();
+        if (key.equals(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS)) {

Review Comment:
   Why we need to do this judge here, this method is just want to get the children under given key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] zhuxt2015 commented on a diff in pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
zhuxt2015 commented on code in PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#discussion_r934004485


##########
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/EphemeralNodeManager.java:
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 1. EphemeralNodeRefreshThread check current master node connection and check ephemeral node expire time
+ * 2. maintain Map<String, Long> activeMasterServers
+ * 3. maintain Map<String, Long> activeWorkerServers
+ * 4. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS, List<String>> master servers
+ * 5. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS, List<String>> worker servers
+ * 6. maintain Map<Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, List<String>> dead servers
+ */
+@Slf4j
+public class EphemeralNodeManager implements AutoCloseable {
+    private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>());
+
+    private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();
+
+    private RaftRegistryProperties properties;
+
+
+    private RheaKVStore kvStore;
+
+    public EphemeralNodeManager(RaftRegistryProperties properties, RheaKVStore kvStore) {
+        this.properties = properties;
+        this.kvStore = kvStore;
+    }
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(
+            2,
+            new ThreadFactoryBuilder().setNameFormat("EphemeralNodeRefreshThread").setDaemon(true).build());
+
+    public void start() {
+        scheduledExecutorService.scheduleWithFixedDelay(new ConnectionCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+        scheduledExecutorService.scheduleWithFixedDelay(new SubscribeCheckTask(),
+                properties.getListenerCheckInterval().toMillis(),
+                properties.getListenerCheckInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        connectionListeners.clear();
+        dataSubScribeMap.clear();
+        scheduledExecutorService.shutdown();
+    }
+
+    public void addConnectionListener(ConnectionListener listener) {
+        connectionListeners.add(listener);
+    }
+
+    public boolean addSubscribeListener(String path, SubscribeListener listener) {
+        return dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>()).add(listener);
+    }
+
+    public void removeSubscribeListener(String path) {
+        dataSubScribeMap.remove(path);
+    }
+
+    private class ConnectionCheckTask implements Runnable {
+        private ConnectionState connectionState = null;
+
+        @Override
+        public void run() {
+            checkConnection();
+            checkActiveNode();
+        }
+
+        private void checkConnection() {
+            final String host = NetUtils.getHost();
+            if (getActiveMasterServers().keySet().stream().anyMatch(address -> address.split(Constants.COLON)[0].equals(host))) {
+                if (connectionState == null && !connectionListeners.isEmpty()) {
+                    triggerListener(ConnectionState.CONNECTED);
+                } else if (connectionState == ConnectionState.DISCONNECTED) {
+                    triggerListener(ConnectionState.RECONNECTED);
+                } else {
+                    triggerListener(ConnectionState.CONNECTED);
+                }
+                connectionState = ConnectionState.CONNECTED;
+            }
+        }
+
+        private void checkActiveNode() {
+            long expireTime = properties.getConnectionExpireFactor() * properties.getListenerCheckInterval().toMillis();
+            Map<String, Long> activeMasterServers = getActiveMasterServers();
+            for (Map.Entry<String, Long> entry : activeMasterServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeMasterServers.remove(nodeAddress);
+                    updateActiveMaster(activeMasterServers);
+                    addDeadServer(Constants.MASTER_TYPE, nodeAddress);
+                    if (nodeAddress.split(Constants.COLON)[0].equals(NetUtils.getHost())) {
+                        connectionState = ConnectionState.DISCONNECTED;
+                        triggerListener(ConnectionState.DISCONNECTED);
+                        removeNodeData(nodeAddress);
+                    }
+                    log.warn("Master server {} connect to raft cluster timeout, last heartbeat {}, timeout config {} ms",
+                            nodeAddress, convertTimeToString(entry.getValue()), expireTime);
+                }
+            }
+            Map<String, Long> activeWorkerServers = getActiveWorkerServers();
+            for (Map.Entry<String, Long> entry : activeWorkerServers.entrySet()) {
+                if ((System.currentTimeMillis() - entry.getValue()) > expireTime) {
+                    final String nodeAddress = entry.getKey();
+                    activeWorkerServers.remove(nodeAddress);
+                    updateActiveWorker(nodeAddress, activeWorkerServers);
+                    removeWorkerGroup(nodeAddress);
+                    addDeadServer(Constants.WORKER_TYPE, nodeAddress);
+                    removeNodeData(nodeAddress);
+                    log.warn("Worker server {} connect to raft cluster timeout, last heartbeat {}, timeout config {} ms",
+                            nodeAddress, convertTimeToString(entry.getValue()), expireTime);
+                }
+            }
+        }
+
+        private void triggerListener(ConnectionState connectionState) {
+            connectionListeners.forEach(listener -> listener.onUpdate(connectionState));
+        }
+    }
+
+    private class SubscribeCheckTask implements Runnable {
+
+        private final Map<String, String> nodeDataMap = new ConcurrentHashMap<>();
+
+        @Override
+        public void run() {
+            subscribeCheck();
+        }
+
+        private void subscribeCheck() {
+            if (dataSubScribeMap.isEmpty()) {
+                return;
+            }
+            final Map<String, String> currentNodeDataMap = getNodeDataMap();
+            // find the different
+            Map<String, String> addedData = new HashMap<>();
+            Map<String, String> deletedData = new HashMap<>();
+            Map<String, String> updatedData = new HashMap<>();
+            for (Map.Entry<String, String> entry : currentNodeDataMap.entrySet()) {
+                final String oldData = nodeDataMap.get(entry.getKey());
+                if (oldData == null) {
+                    addedData.put(entry.getKey(), entry.getValue());
+                } else {
+                    HeartBeat newHeartBeat = HeartBeat.decodeHeartBeat(entry.getValue());

Review Comment:
   Like zookeeper, subscribe listener compare node hearbeat to notify listener.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] zhuxt2015 commented on pull request #11144: [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency

Posted by GitBox <gi...@apache.org>.
zhuxt2015 commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1229650082

   > Hi @zhuxt2015 please check the review comment and resolve conflicts.
   
   OK, testing in local


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [DSIP-9][Feature][Server] Add Raft consensus algorithm registry, remove zookeeper dependency [dolphinscheduler]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #11144:
URL: https://github.com/apache/dolphinscheduler/pull/11144#issuecomment-1947546991

   This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org