You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by va...@apache.org on 2022/08/15 10:30:36 UTC

[incubator-hugegraph] 01/01: #1869 cluster role automatic management

This is an automated email from the ASF dual-hosted git repository.

vaughn pushed a commit to branch zy_dev
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit ed3f3cacdd9f76b31175ed1ae67d20a043e5aec0
Author: zyxxoo <13...@qq.com>
AuthorDate: Mon Aug 15 18:30:17 2022 +0800

    #1869 cluster role automatic management
---
 .../java/com/baidu/hugegraph/election/Config.java  |  14 ++
 .../com/baidu/hugegraph/election/MetaData.java     |  45 ++++
 .../baidu/hugegraph/election/MetaDataAdapter.java  |  11 +
 .../election/RoleElectionStateMachine.java         |   9 +
 .../election/RoleElectionStateMachineImpl.java     | 277 +++++++++++++++++++++
 .../hugegraph/election/StateMachineCallback.java   |  16 ++
 .../hugegraph/election/StateMachineContext.java    |  17 ++
 .../core/RoleElectionStateMachineTest.java         | 214 ++++++++++++++++
 8 files changed, 603 insertions(+)

diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java
new file mode 100644
index 000000000..aa49ecc8a
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java
@@ -0,0 +1,14 @@
+package com.baidu.hugegraph.election;
+
+public interface Config {
+
+    String node();
+
+    int exceedsFailCount();
+
+    long randomTimeoutMillisecond();
+
+    long heartBeatIntervalSecond();
+
+    int exceedsWorkerCount();
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java
new file mode 100644
index 000000000..684c2fb12
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java
@@ -0,0 +1,45 @@
+package com.baidu.hugegraph.election;
+
+import java.util.Objects;
+
+public class MetaData {
+
+    String node;
+    long count;
+    int epoch;
+
+    public MetaData(String node, int epoch) {
+        this.node = node;
+        this.epoch = epoch;
+        this.count = 1;
+    }
+
+    public void increaseCount() {
+        this.count ++;
+    }
+
+    public boolean isMaster(String node) {
+        return Objects.equals(this.node, node);
+    }
+
+    public int epoch() {
+        return this.epoch;
+    }
+
+    public long count() {
+        return this.count;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof MetaData)) return false;
+        MetaData metaData = (MetaData) o;
+        return count == metaData.count && epoch == metaData.epoch && Objects.equals(node, metaData.node);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(node, count, epoch);
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java
new file mode 100644
index 000000000..c879c1f6d
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java
@@ -0,0 +1,11 @@
+package com.baidu.hugegraph.election;
+
+import java.util.Optional;
+
+public interface MetaDataAdapter {
+    boolean postDelyIfPresent(MetaData metaData, long delySecond);
+
+    Optional<MetaData> queryDelay(long delySecond);
+
+    Optional<MetaData> query();
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java
new file mode 100644
index 000000000..666c836a7
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java
@@ -0,0 +1,9 @@
+package com.baidu.hugegraph.election;
+
+public interface RoleElectionStateMachine {
+
+    void shutdown();
+
+    void apply(StateMachineCallback stateMachineCallback);
+
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java
new file mode 100644
index 000000000..b8d96026b
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java
@@ -0,0 +1,277 @@
+package com.baidu.hugegraph.election;
+
+import java.util.Optional;
+import java.util.concurrent.locks.LockSupport;
+
+import com.baidu.hugegraph.util.E;
+
+public class RoleElectionStateMachineImpl implements RoleElectionStateMachine{
+
+    private volatile boolean shutdown = false;
+    private Config config;
+
+    private volatile RoleState state;
+
+    private final MetaDataAdapter metaDataAdapter;
+
+    public RoleElectionStateMachineImpl(Config config, MetaDataAdapter adapter) {
+        this.config = config;
+        this.metaDataAdapter = adapter;
+        this.state = new UnKnownState(null);
+    }
+
+    @Override
+    public void shutdown() {
+        this.shutdown = true;
+    }
+
+    @Override
+    public void apply(StateMachineCallback stateMachineCallback) {
+        int failCount = 0;
+        while (!this.shutdown) {
+            E.checkArgumentNotNull(this.state, "State don't be null");
+            StateMachineContextImpl context = new StateMachineContextImpl(this);
+            try {
+                this.state = state.transform(context);
+                Callback runnable = this.state.callback(stateMachineCallback);
+                runnable.call(context);
+                failCount = 0;
+            } catch (Throwable e) {
+                stateMachineCallback.error(context, e);
+                failCount ++;
+                if (failCount >= this.config.exceedsFailCount()) {
+                    this.state = new SafeState(context.epoch());
+                    Callback runnable = this.state.callback(stateMachineCallback);
+                    runnable.call(context);
+                }
+            }
+        }
+    }
+
+    private interface RoleState {
+
+        RoleState transform(StateMachineContext context);
+
+        Callback callback(StateMachineCallback callback);
+
+        static void heartBeatPark(StateMachineContext context) {
+            long heartBeatIntervalSecond = context.config().heartBeatIntervalSecond();
+            LockSupport.parkNanos(heartBeatIntervalSecond * 1_000_000_000);
+        }
+
+        static void randomPark(StateMachineContext context) {
+            long randomTimeout = context.config().randomTimeoutMillisecond();
+            LockSupport.parkNanos(randomTimeout * 1_000_000);
+        }
+    }
+
+    @FunctionalInterface
+    private interface Callback {
+
+        void call(StateMachineContext context);
+    }
+
+    private static class UnKnownState implements RoleState {
+
+        Integer epoch;
+
+        public UnKnownState(Integer epoch) {
+            this.epoch = epoch;
+        }
+
+        @Override
+        public RoleState transform(StateMachineContext context) {
+            MetaDataAdapter adapter = context.adapter();
+            Optional<MetaData> metaDataOpt = adapter.query();
+            if (!metaDataOpt.isPresent()) {
+                context.reset();
+                return new CandidateState(epoch == null ? 1 : epoch + 1);
+            }
+
+            MetaData metaData = metaDataOpt.get();
+            context.epoch(metaData.epoch());
+            if (metaData.isMaster(context.node())) {
+                return new MasterState(metaData);
+            } else {
+                return new WorkerState(metaData);
+            }
+        }
+
+        @Override
+        public Callback callback(StateMachineCallback callback) {
+            return callback::unknown;
+        }
+    }
+
+    private static class SafeState implements RoleState {
+
+        Integer epoch;
+
+        public SafeState(Integer epoch) {
+            this.epoch = epoch;
+        }
+
+        @Override
+        public RoleState transform(StateMachineContext context) {
+            RoleState.heartBeatPark(context);
+            return new UnKnownState(this.epoch).transform(context);
+        }
+
+        @Override
+        public Callback callback(StateMachineCallback callback) {
+            return callback::safe;
+        }
+    }
+
+    private static class MasterState implements RoleState {
+
+        MetaData metaData;
+
+        public MasterState(MetaData metaData) {
+            this.metaData = metaData;
+        }
+
+        @Override
+        public RoleState transform(StateMachineContext context) {
+            this.metaData.increaseCount();
+            RoleState.heartBeatPark(context);
+            if (context.adapter().postDelyIfPresent(this.metaData, -1)) {
+                return this;
+            }
+            context.reset();
+            return new UnKnownState(this.metaData.epoch());
+        }
+
+        @Override
+        public Callback callback(StateMachineCallback callback) {
+            return callback::master;
+        }
+    }
+
+    private static class WorkerState implements RoleState {
+
+        private MetaData metaData;
+        private int count = 0;
+
+        public WorkerState(MetaData metaData) {
+            this.metaData = metaData;
+        }
+
+        @Override
+        public RoleState transform(StateMachineContext context) {
+            RoleState.heartBeatPark(context);
+            RoleState nextState = new UnKnownState(this.metaData.epoch()).transform(context);
+            if (nextState instanceof WorkerState) {
+                this.merge((WorkerState) nextState);
+                if (this.count > context.config().exceedsWorkerCount()) {
+                    return new CandidateState(this.metaData.epoch() + 1);
+                } else {
+                    return this;
+                }
+            } else {
+                return nextState;
+            }
+        }
+
+        @Override
+        public Callback callback(StateMachineCallback callback) {
+            return callback::worker;
+        }
+
+        public void merge(WorkerState state) {
+            if (state.metaData.epoch() > this.metaData.epoch()) {
+                this.count = 0;
+                this.metaData = state.metaData;
+            } else if (state.metaData.epoch() < this.metaData.epoch()){
+                throw new IllegalStateException("Epoch must increase");
+            } else if (state.metaData.epoch() == this.metaData.epoch() &&
+                    state.metaData.count() < this.metaData.count()) {
+                throw new IllegalStateException("Meta count must increase");
+            } else if (state.metaData.epoch() == this.metaData.epoch() &&
+                       state.metaData.count() > this.metaData.count()) {
+                this.metaData = state.metaData;
+            } else {
+                this.count ++;
+            }
+        }
+    }
+
+    private static class CandidateState implements RoleState {
+
+        Integer epoch;
+
+        public CandidateState(Integer epoch) {
+            this.epoch = epoch;
+        }
+
+        @Override
+        public RoleState transform(StateMachineContext context) {
+            RoleState.randomPark(context);
+            int epoch = this.epoch == null ? 1 : this.epoch;
+            MetaData metaData = new MetaData(context.config().node(), epoch);
+            //failover to master success
+            context.epoch(metaData.epoch());
+            if (context.adapter().postDelyIfPresent(metaData, -1)) {
+                return new MasterState(metaData);
+            } else {
+                return new WorkerState(metaData);
+            }
+        }
+
+        @Override
+        public Callback callback(StateMachineCallback callback) {
+            return callback::candidate;
+        }
+    }
+
+    private static class StateMachineContextImpl implements StateMachineContext {
+
+        private Integer epoch;
+        private final String node;
+        private final RoleElectionStateMachineImpl machine;
+
+        public StateMachineContextImpl(RoleElectionStateMachineImpl machine) {
+            this.node = machine.config.node();
+            this.machine = machine;
+        }
+
+        @Override
+        public Integer epoch() {
+            return this.epoch;
+        }
+
+        @Override
+        public String node() {
+            return this.node;
+        }
+
+        @Override
+        public void epoch(Integer epoch) {
+            this.epoch = epoch;
+        }
+
+        @Override
+        public MetaDataAdapter adapter() {
+            return this.machine.adapter();
+        }
+
+        @Override
+        public Config config() {
+            return this.machine.config;
+        }
+
+        @Override
+        public RoleElectionStateMachine stateMachine() {
+            return this.machine;
+        }
+
+        @Override
+        public void reset() {
+            this.epoch = null;
+        }
+    }
+
+    protected MetaDataAdapter adapter() {
+        return this.metaDataAdapter;
+    }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java
new file mode 100644
index 000000000..b765c455c
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java
@@ -0,0 +1,16 @@
+package com.baidu.hugegraph.election;
+
+public interface StateMachineCallback {
+
+    void master(StateMachineContext context);
+
+    void worker(StateMachineContext context);
+
+    void candidate(StateMachineContext context);
+
+    void unknown(StateMachineContext context);
+
+    void safe(StateMachineContext context);
+
+    void error(StateMachineContext context, Throwable e);
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java
new file mode 100644
index 000000000..9b4ca07ab
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java
@@ -0,0 +1,17 @@
+package com.baidu.hugegraph.election;
+
+public interface StateMachineContext {
+    Integer epoch();
+
+    String node();
+
+    RoleElectionStateMachine stateMachine();
+
+    void epoch(Integer epoch);
+
+    Config config();
+
+    MetaDataAdapter adapter();
+
+    void reset();
+}
diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java
new file mode 100644
index 000000000..e35952a8f
--- /dev/null
+++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java
@@ -0,0 +1,214 @@
+package com.baidu.hugegraph.core;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.LockSupport;
+
+import com.baidu.hugegraph.election.Config;
+import com.baidu.hugegraph.election.MetaData;
+import com.baidu.hugegraph.election.MetaDataAdapter;
+import com.baidu.hugegraph.election.RoleElectionStateMachine;
+import com.baidu.hugegraph.election.RoleElectionStateMachineImpl;
+import com.baidu.hugegraph.election.StateMachineCallback;
+import com.baidu.hugegraph.election.StateMachineContext;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RoleElectionStateMachineTest {
+
+    public static class LogEntry {
+
+        Integer epoch;
+        String node;
+
+        Role role;
+
+        enum Role {
+            master,
+            worker,
+            candidate,
+            safe,
+            unknown
+        }
+
+        public LogEntry(Integer epoch, String node, Role role) {
+            this.epoch = epoch;
+            this.node = node;
+            this.role = role;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof LogEntry)) return false;
+            LogEntry logEntry = (LogEntry) o;
+            return Objects.equals(epoch, logEntry.epoch) && Objects.equals(node, logEntry.node) && role == logEntry.role;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(epoch, node, role);
+        }
+
+        @Override
+        public String toString() {
+            return "LogEntry{" +
+                    "epoch=" + epoch +
+                    ", node='" + node + '\'' +
+                    ", role=" + role +
+                    '}';
+        }
+    }
+
+    private static class TestConfig implements Config {
+
+        String node;
+
+        public TestConfig(String node) {
+            this.node = node;
+        }
+
+        @Override
+        public String node() {
+            return this.node;
+        }
+
+        @Override
+        public int exceedsFailCount() {
+            return 10;
+        }
+
+        @Override
+        public long randomTimeoutMillisecond() {
+            return 400;
+        }
+
+        @Override
+        public long heartBeatIntervalSecond() {
+            return 1;
+        }
+
+        @Override
+        public int exceedsWorkerCount() {
+            return 5;
+        }
+    }
+
+    @Test
+    public void testStateMachine() throws InterruptedException {
+        final CountDownLatch stop = new CountDownLatch(3);
+        final List<LogEntry> logRecords = Collections.synchronizedList(new ArrayList<>(20));
+        final StateMachineCallback callback = new StateMachineCallback() {
+
+            @Override
+            public void master(StateMachineContext context) {
+                Integer epochId = context.epoch();
+                String node = context.node();
+                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.master));
+            }
+
+            @Override
+            public void worker(StateMachineContext context) {
+                Integer epochId = context.epoch();
+                String node = context.node();
+                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.worker));
+            }
+
+            @Override
+            public void candidate(StateMachineContext context) {
+                Integer epochId = context.epoch();
+                String node = context.node();
+                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate));
+            }
+
+            @Override
+            public void unknown(StateMachineContext context) {
+                Integer epochId = context.epoch();
+                String node = context.node();
+                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate));
+            }
+
+            @Override
+            public void safe(StateMachineContext context) {
+                Integer epochId = context.epoch();
+                String node = context.node();
+                logRecords.add(new LogEntry(epochId, node, LogEntry.Role.safe));
+            }
+
+            @Override
+            public void error(StateMachineContext context, Throwable e) {
+
+            }
+        };
+        final MetaDataAdapter adapter = new MetaDataAdapter() {
+            int epoch = 0;
+            Map<Integer, MetaData> data = new ConcurrentHashMap<>();
+            @Override
+            public boolean postDelyIfPresent(MetaData metaData, long delySecond) {
+                LockSupport.parkNanos(delySecond * 1_000_000_000);
+                MetaData oldData = data.computeIfAbsent(metaData.epoch(), (key) -> {
+                    this.epoch = key;
+                    return metaData;
+                });
+                return oldData == metaData;
+            }
+
+            @Override
+            public Optional<MetaData> queryDelay(long delySecond) {
+                LockSupport.parkNanos(delySecond * 1_000_000_000);
+                return Optional.ofNullable(this.data.get(this.epoch));
+            }
+
+            @Override
+            public Optional<MetaData> query() {
+                return Optional.ofNullable(this.data.get(this.epoch));
+            }
+        };
+
+        Thread node1 = new Thread(() -> {
+            Config config = new TestConfig("1");
+            RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
+            stateMachine.apply(callback);
+            stop.countDown();
+        });
+
+        Thread node2 = new Thread(() -> {
+            Config config = new TestConfig("2");
+            RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
+            stateMachine.apply(callback);
+            stop.countDown();
+        });
+
+        Thread node3 = new Thread(() -> {
+            Config config = new TestConfig("3");
+            RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
+            stateMachine.apply(callback);
+            stop.countDown();
+        });
+
+        node1.start();
+        node2.start();
+        node3.start();
+
+        stop.await();
+
+        Assert.assertTrue(logRecords.size() > 100);
+        Map<Integer, String> masters = new HashMap<>();
+        for (LogEntry entry: logRecords) {
+            if (entry.role == LogEntry.Role.master) {
+                String lastNode = masters.putIfAbsent(entry.epoch, entry.node);
+                Assert.assertEquals(lastNode, entry.node);
+            }
+        }
+
+        Assert.assertTrue(masters.size() > 0);
+    }
+}