You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by va...@apache.org on 2022/09/16 02:53:15 UTC

[incubator-hugegraph] branch master updated: feat(core): cluster role automatic management (#1943)

This is an automated email from the ASF dual-hosted git repository.

vaughn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git


The following commit(s) were added to refs/heads/master by this push:
     new 51d29ac3a feat(core): cluster role automatic management (#1943)
51d29ac3a is described below

commit 51d29ac3a0ec04c43c99e5262655d344d873fc1d
Author: zyxxoo <13...@qq.com>
AuthorDate: Fri Sep 16 10:53:06 2022 +0800

    feat(core): cluster role automatic management (#1943)
    
    * #1869 cluster role automatic management
    
    * improve code
    
    * FIX bug
    
    * improve code
    
    * improve code
    
    * improve code
    
    * improve code
    
    * improve code
    
    * improve code
---
 .../java/com/baidu/hugegraph/election/Config.java  |  35 +++
 .../election/RoleElectionStateMachine.java         |  27 ++
 .../election/RoleElectionStateMachineImpl.java     | 312 ++++++++++++++++++++
 .../com/baidu/hugegraph/election/RoleTypeData.java |  91 ++++++
 .../hugegraph/election/RoleTypeDataAdapter.java    |  29 ++
 .../hugegraph/election/StateMachineCallback.java   |  35 +++
 .../hugegraph/election/StateMachineContext.java    |  37 +++
 .../com/baidu/hugegraph/core/CoreTestSuite.java    |   3 +-
 .../core/RoleElectionStateMachineTest.java         | 318 +++++++++++++++++++++
 9 files changed, 886 insertions(+), 1 deletion(-)

diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java
new file mode 100644
index 000000000..ad39c0b5e
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+public interface Config {
+
+    String node();
+
+    int exceedsFailCount();
+
+    long randomTimeoutMillisecond();
+
+    long heartBeatIntervalSecond();
+
+    int exceedsWorkerCount();
+
+    long baseTimeoutMillisecond();
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java
new file mode 100644
index 000000000..4bc258623
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+public interface RoleElectionStateMachine {
+
+    void shutdown();
+
+    void apply(StateMachineCallback stateMachineCallback);
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java
new file mode 100644
index 000000000..016f51cb8
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java
@@ -0,0 +1,312 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+import java.security.SecureRandom;
+import java.util.Optional;
+import java.util.concurrent.locks.LockSupport;
+
+import com.baidu.hugegraph.util.E;
+
+public class RoleElectionStateMachineImpl implements RoleElectionStateMachine {
+
+    private volatile boolean shutdown;
+    private final Config config;
+    private volatile RoleState state;
+    private final RoleTypeDataAdapter roleTypeDataAdapter;
+
+    public RoleElectionStateMachineImpl(Config config, RoleTypeDataAdapter adapter) {
+        this.config = config;
+        this.roleTypeDataAdapter = adapter;
+        this.state = new UnknownState(null);
+        this.shutdown = false;
+    }
+
+    @Override
+    public void shutdown() {
+        this.shutdown = true;
+    }
+
+    @Override
+    public void apply(StateMachineCallback stateMachineCallback) {
+        int failCount = 0;
+        StateMachineContextImpl context = new StateMachineContextImpl(this);
+        while (!this.shutdown) {
+            E.checkArgumentNotNull(this.state, "State don't be null");
+            try {
+                this.state = state.transform(context);
+                Callback runnable = this.state.callback(stateMachineCallback);
+                runnable.call(context);
+                failCount = 0;
+            } catch (Throwable e) {
+                stateMachineCallback.error(context, e);
+                failCount ++;
+                if (failCount >= this.config.exceedsFailCount()) {
+                    this.state = new AbdicationState(context.epoch());
+                    Callback runnable = this.state.callback(stateMachineCallback);
+                    runnable.call(context);
+                }
+            }
+        }
+    }
+
+    private interface RoleState {
+
+        SecureRandom secureRandom = new SecureRandom();
+
+        RoleState transform(StateMachineContext context);
+
+        Callback callback(StateMachineCallback callback);
+
+        static void heartBeatPark(StateMachineContext context) {
+            long heartBeatIntervalSecond = context.config().heartBeatIntervalSecond();
+            LockSupport.parkNanos(heartBeatIntervalSecond * 1_000_000_000);
+        }
+
+        static void randomPark(StateMachineContext context) {
+            long randomTimeout = context.config().randomTimeoutMillisecond();
+            long baseTime = context.config().baseTimeoutMillisecond();
+            long timeout = (long) (baseTime + (randomTimeout / 10.0 * secureRandom.nextInt(11)));
+            LockSupport.parkNanos(timeout * 1_000_000);
+        }
+    }
+
+    @FunctionalInterface
+    private interface Callback {
+
+        void call(StateMachineContext context);
+    }
+
+    private static class UnknownState implements RoleState {
+
+        final Integer epoch;
+
+        public UnknownState(Integer epoch) {
+            this.epoch = epoch;
+        }
+
+        @Override
+        public RoleState transform(StateMachineContext context) {
+            RoleTypeDataAdapter adapter = context.adapter();
+            Optional<RoleTypeData> roleTypeDataOpt = adapter.query();
+            if (!roleTypeDataOpt.isPresent()) {
+                context.reset();
+                Integer nextEpoch = this.epoch == null ? 1 : this.epoch + 1;
+                context.epoch(nextEpoch);
+                return new CandidateState(nextEpoch);
+            }
+
+            RoleTypeData roleTypeData = roleTypeDataOpt.get();
+            if (this.epoch != null && roleTypeData.epoch() < this.epoch) {
+                context.reset();
+                Integer nextEpoch = this.epoch + 1;
+                context.epoch(nextEpoch);
+                return new CandidateState(nextEpoch);
+            }
+
+            context.epoch(roleTypeData.epoch());
+            if (roleTypeData.isMaster(context.node())) {
+                return new MasterState(roleTypeData);
+            } else {
+                return new WorkerState(roleTypeData);
+            }
+        }
+
+        @Override
+        public Callback callback(StateMachineCallback callback) {
+            return callback::unknown;
+        }
+    }
+
+    private static class AbdicationState implements RoleState {
+
+        private final Integer epoch;
+
+        public AbdicationState(Integer epoch) {
+            this.epoch = epoch;
+        }
+
+        @Override
+        public RoleState transform(StateMachineContext context) {
+            RoleState.heartBeatPark(context);
+            return new UnknownState(this.epoch).transform(context);
+        }
+
+        @Override
+        public Callback callback(StateMachineCallback callback) {
+            return callback::abdication;
+        }
+    }
+
+    private static class MasterState implements RoleState {
+
+        private final RoleTypeData roleTypeData;
+
+        public MasterState(RoleTypeData roleTypeData) {
+            this.roleTypeData = roleTypeData;
+        }
+
+        @Override
+        public RoleState transform(StateMachineContext context) {
+            this.roleTypeData.increaseClock();
+            RoleState.heartBeatPark(context);
+            if (context.adapter().updateIfNodePresent(this.roleTypeData)) {
+                return this;
+            }
+            context.reset();
+            context.epoch(this.roleTypeData.epoch());
+            return new UnknownState(this.roleTypeData.epoch()).transform(context);
+        }
+
+        @Override
+        public Callback callback(StateMachineCallback callback) {
+            return callback::master;
+        }
+    }
+
+    private static class WorkerState implements RoleState {
+
+        private RoleTypeData roleTypeData;
+        private int clock;
+
+        public WorkerState(RoleTypeData roleTypeData) {
+            this.roleTypeData = roleTypeData;
+            this.clock = 0;
+        }
+
+        @Override
+        public RoleState transform(StateMachineContext context) {
+            RoleState.heartBeatPark(context);
+            RoleState nextState = new UnknownState(this.roleTypeData.epoch()).transform(context);
+            if (nextState instanceof WorkerState) {
+                this.merge((WorkerState) nextState);
+                if (this.clock > context.config().exceedsWorkerCount()) {
+                    return new CandidateState(this.roleTypeData.epoch() + 1);
+                } else {
+                    return this;
+                }
+            } else {
+                return nextState;
+            }
+        }
+
+        @Override
+        public Callback callback(StateMachineCallback callback) {
+            return callback::worker;
+        }
+
+        public void merge(WorkerState state) {
+            if (state.roleTypeData.epoch() > this.roleTypeData.epoch()) {
+                this.clock = 0;
+                this.roleTypeData = state.roleTypeData;
+            } else if (state.roleTypeData.epoch() < this.roleTypeData.epoch()){
+                throw new IllegalStateException("Epoch must increase");
+            } else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() &&
+                       state.roleTypeData.clock() < this.roleTypeData.clock()) {
+                throw new IllegalStateException("Clock must increase");
+            } else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() &&
+                       state.roleTypeData.clock() > this.roleTypeData.clock()) {
+                this.clock = 0;
+                this.roleTypeData = state.roleTypeData;
+            } else {
+                this.clock++;
+            }
+        }
+    }
+
+    private static class CandidateState implements RoleState {
+
+        private final Integer epoch;
+
+        public CandidateState(Integer epoch) {
+            this.epoch = epoch;
+        }
+
+        @Override
+        public RoleState transform(StateMachineContext context) {
+            RoleState.randomPark(context);
+            int epoch = this.epoch == null ? 1 : this.epoch;
+            RoleTypeData roleTypeData = new RoleTypeData(context.config().node(), epoch);
+            //failover to master success
+            context.epoch(roleTypeData.epoch());
+            if (context.adapter().updateIfNodePresent(roleTypeData)) {
+                return new MasterState(roleTypeData);
+            } else {
+                return new UnknownState(epoch).transform(context);
+            }
+        }
+
+        @Override
+        public Callback callback(StateMachineCallback callback) {
+            return callback::candidate;
+        }
+    }
+
+    private static class StateMachineContextImpl implements StateMachineContext {
+
+        private Integer epoch;
+        private final String node;
+        private final RoleElectionStateMachineImpl machine;
+
+        public StateMachineContextImpl(RoleElectionStateMachineImpl machine) {
+            this.node = machine.config.node();
+            this.machine = machine;
+        }
+
+        @Override
+        public Integer epoch() {
+            return this.epoch;
+        }
+
+        @Override
+        public String node() {
+            return this.node;
+        }
+
+        @Override
+        public void epoch(Integer epoch) {
+            this.epoch = epoch;
+        }
+
+        @Override
+        public RoleTypeDataAdapter adapter() {
+            return this.machine.adapter();
+        }
+
+        @Override
+        public Config config() {
+            return this.machine.config;
+        }
+
+        @Override
+        public RoleElectionStateMachine stateMachine() {
+            return this.machine;
+        }
+
+        @Override
+        public void reset() {
+            this.epoch = null;
+        }
+    }
+
+    protected RoleTypeDataAdapter adapter() {
+        return this.roleTypeDataAdapter;
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeData.java
new file mode 100644
index 000000000..aa60c47ac
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeData.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+import java.util.Objects;
+
+public class RoleTypeData {
+
+    private String node;
+    private long clock;
+    private int epoch;
+
+    public RoleTypeData(String node, int epoch) {
+        this(node, epoch, 1);
+    }
+
+    public RoleTypeData(String node, int epoch, long clock) {
+        this.node = node;
+        this.epoch = epoch;
+        this.clock = clock;
+    }
+
+    public void increaseClock() {
+        this.clock++;
+    }
+
+    public boolean isMaster(String node) {
+        return Objects.equals(this.node, node);
+    }
+
+    public int epoch() {
+        return this.epoch;
+    }
+
+    public long clock() {
+        return this.clock;
+    }
+
+    public void clock(long clock) {
+        this.clock = clock;
+    }
+
+    public String node() {
+        return this.node;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof RoleTypeData)) {
+            return false;
+        }
+        RoleTypeData metaData = (RoleTypeData) obj;
+        return clock == metaData.clock &&
+               epoch == metaData.epoch &&
+               Objects.equals(node, metaData.node);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(node, clock, epoch);
+    }
+
+    @Override
+    public String toString() {
+        return "RoleStateData{" +
+                "node='" + node + '\'' +
+                ", clock=" + clock +
+                ", epoch=" + epoch +
+                '}';
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java
new file mode 100644
index 000000000..41a2021f0
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+import java.util.Optional;
+
+public interface RoleTypeDataAdapter {
+
+    boolean updateIfNodePresent(RoleTypeData stateData);
+
+    Optional<RoleTypeData> query();
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java
new file mode 100644
index 000000000..8403b5950
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+public interface StateMachineCallback {
+
+    void master(StateMachineContext context);
+
+    void worker(StateMachineContext context);
+
+    void candidate(StateMachineContext context);
+
+    void unknown(StateMachineContext context);
+
+    void abdication(StateMachineContext context);
+
+    void error(StateMachineContext context, Throwable e);
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java
new file mode 100644
index 000000000..a3693f5fa
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+public interface StateMachineContext {
+
+    Integer epoch();
+
+    String node();
+
+    RoleElectionStateMachine stateMachine();
+
+    void epoch(Integer epoch);
+
+    Config config();
+
+    RoleTypeDataAdapter adapter();
+
+    void reset();
+}
diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java
index 509cdf814..d6dd4c350 100644
--- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java
+++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java
@@ -49,7 +49,8 @@ import com.baidu.hugegraph.util.Log;
     TaskCoreTest.class,
     AuthTest.class,
     MultiGraphsTest.class,
-    RamTableTest.class
+    RamTableTest.class,
+    RoleElectionStateMachineTest.class
 })
 public class CoreTestSuite {
 
diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java
new file mode 100644
index 000000000..6e8af972d
--- /dev/null
+++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java
@@ -0,0 +1,318 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.core;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.LockSupport;
+
+import com.baidu.hugegraph.election.Config;
+import com.baidu.hugegraph.election.RoleElectionStateMachine;
+import com.baidu.hugegraph.election.RoleElectionStateMachineImpl;
+import com.baidu.hugegraph.election.RoleTypeData;
+import com.baidu.hugegraph.election.RoleTypeDataAdapter;
+import com.baidu.hugegraph.election.StateMachineCallback;
+import com.baidu.hugegraph.election.StateMachineContext;
+import com.baidu.hugegraph.testutil.Assert;
+import org.junit.Test;
+
+public class RoleElectionStateMachineTest {
+
+    public static class LogEntry {
+
+        Integer epoch;
+
+        String node;
+
+        Role role;
+
+        enum Role {
+            master,
+            worker,
+            candidate,
+            abdication,
+            unknown
+        }
+
+        public LogEntry(Integer epoch, String node, Role role) {
+            this.epoch = epoch;
+            this.node = node;
+            this.role = role;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof LogEntry)) {
+                return false;
+            }
+            LogEntry logEntry = (LogEntry) obj;
+            return Objects.equals(epoch, logEntry.epoch) &&
+                   Objects.equals(node, logEntry.node) && role == logEntry.role;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(epoch, node, role);
+        }
+
+        @Override
+        public String toString() {
+            return "LogEntry{" +
+                    "epoch=" + epoch +
+                    ", node='" + node + '\'' +
+                    ", role=" + role +
+                    '}';
+        }
+    }
+
+    private static class TestConfig implements Config {
+
+        String node;
+
+        public TestConfig(String node) {
+            this.node = node;
+        }
+
+        @Override
+        public String node() {
+            return this.node;
+        }
+
+        @Override
+        public int exceedsFailCount() {
+            return 2;
+        }
+
+        @Override
+        public long randomTimeoutMillisecond() {
+            return 400;
+        }
+
+        @Override
+        public long heartBeatIntervalSecond() {
+            return 1;
+        }
+
+        @Override
+        public int exceedsWorkerCount() {
+            return 5;
+        }
+
+        @Override
+        public long baseTimeoutMillisecond() {
+            return 100;
+        }
+    }
+
+    @Test
+    public void testStateMachine() throws InterruptedException {
+        final CountDownLatch stop = new CountDownLatch(4);
+        final int MAX_COUNT = 200;
+        final List<LogEntry> logRecords = Collections.synchronizedList(new ArrayList<>(MAX_COUNT));
+        final List<String> masterNodes = Collections.synchronizedList(new ArrayList<>(MAX_COUNT));
+        final StateMachineCallback callback = new StateMachineCallback() {
+
+            @Override
+            public void master(StateMachineContext context) {
+                Integer epochId = context.epoch();
+                String node = context.node();
+                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.master));
+                if (logRecords.size() > MAX_COUNT) {
+                    context.stateMachine().shutdown();
+                }
+                System.out.println("master node: " + node);
+                masterNodes.add(node);
+            }
+
+            @Override
+            public void worker(StateMachineContext context) {
+                Integer epochId = context.epoch();
+                String node = context.node();
+                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.worker));
+                if (logRecords.size() > MAX_COUNT) {
+                    context.stateMachine().shutdown();
+                }
+            }
+
+            @Override
+            public void candidate(StateMachineContext context) {
+                Integer epochId = context.epoch();
+                String node = context.node();
+                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate));
+                if (logRecords.size() > MAX_COUNT) {
+                    context.stateMachine().shutdown();
+                }
+            }
+
+            @Override
+            public void unknown(StateMachineContext context) {
+                Integer epochId = context.epoch();
+                String node = context.node();
+                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.unknown));
+                if (logRecords.size() > MAX_COUNT) {
+                    context.stateMachine().shutdown();
+                }
+            }
+
+            @Override
+            public void abdication(StateMachineContext context) {
+                Integer epochId = context.epoch();
+                String node = context.node();
+                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.abdication));
+                if (logRecords.size() > MAX_COUNT) {
+                    context.stateMachine().shutdown();
+                }
+            }
+
+            @Override
+            public void error(StateMachineContext context, Throwable e) {
+                System.out.println("state machine error: node " + context.node() + " message " + e.getMessage());
+            }
+        };
+
+        final List<RoleTypeData> metaDataLogs = Collections.synchronizedList(new ArrayList<>(100));
+        final RoleTypeDataAdapter adapter = new RoleTypeDataAdapter() {
+
+            volatile int epoch = 0;
+
+            final Map<Integer, RoleTypeData> data = new ConcurrentHashMap<>();
+
+            RoleTypeData copy(RoleTypeData stateData) {
+                if (stateData == null) {
+                    return null;
+                }
+                return new RoleTypeData(stateData.node(), stateData.epoch(), stateData.clock());
+            }
+
+            @Override
+            public boolean updateIfNodePresent(RoleTypeData stateData) {
+                if (stateData.epoch() < this.epoch) {
+                    return false;
+                }
+
+                RoleTypeData copy = this.copy(stateData);
+                RoleTypeData newData = data.compute(copy.epoch(), (key, value) -> {
+                    if (copy.epoch() > this.epoch) {
+                        this.epoch = copy.epoch();
+                        Assert.assertNull(value);
+                        metaDataLogs.add(copy);
+                        System.out.println("The node " + copy + " become new master:");
+                        return copy;
+                    }
+
+                    Assert.assertEquals(value.epoch(), copy.epoch());
+                    if (Objects.equals(value.node(), copy.node()) &&
+                        value.clock() <= copy.clock()) {
+                        System.out.println("The master node " + copy + " keep heartbeat");
+                        metaDataLogs.add(copy);
+                        if (value.clock() == copy.clock()) {
+                            Assert.fail("Clock must increase when same epoch and node id");
+                        }
+                        return copy;
+                    }
+                    return value;
+
+                });
+                return Objects.equals(newData, copy);
+            }
+
+            @Override
+            public Optional<RoleTypeData> query() {
+                return Optional.ofNullable(this.copy(this.data.get(this.epoch)));
+            }
+        };
+
+        RoleElectionStateMachine[] machines = new RoleElectionStateMachine[4];
+        Thread node1 = new Thread(() -> {
+            Config config = new TestConfig("1");
+            RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
+            machines[1] = stateMachine;
+            stateMachine.apply(callback);
+            stop.countDown();
+        });
+
+        Thread node2 = new Thread(() -> {
+            Config config = new TestConfig("2");
+            RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
+            machines[2] = stateMachine;
+            stateMachine.apply(callback);
+            stop.countDown();
+        });
+
+        Thread node3 = new Thread(() -> {
+            Config config = new TestConfig("3");
+            RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
+            machines[3] = stateMachine;
+            stateMachine.apply(callback);
+            stop.countDown();
+        });
+
+        node1.start();
+        node2.start();
+        node3.start();
+
+        Thread randomShutdown = new Thread(() -> {
+            Set<String> dropNodes = new HashSet<>();
+            while (dropNodes.size() < 3) {
+                LockSupport.parkNanos(5_000_000_000L);
+                int size = masterNodes.size();
+                if (size < 1) {
+                    continue;
+                }
+                String node = masterNodes.get(size - 1);
+                if (dropNodes.contains(node)) {
+                    continue;
+                }
+                machines[Integer.parseInt(node)].shutdown();
+                dropNodes.add(node);
+                System.out.println("----shutdown machine " + node);
+            }
+            stop.countDown();
+        });
+
+        randomShutdown.start();
+        stop.await();
+
+        Assert.assertGt(0, logRecords.size());
+        Map<Integer, String> masters = new HashMap<>();
+        for (LogEntry entry: logRecords) {
+            if (entry.role == LogEntry.Role.master) {
+                String lastNode = masters.putIfAbsent(entry.epoch, entry.node);
+                if (lastNode != null) {
+                    Assert.assertEquals(lastNode, entry.node);
+                }
+            }
+        }
+
+        Assert.assertGt(0, masters.size());
+    }
+}