You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by va...@apache.org on 2022/08/15 10:30:36 UTC
[incubator-hugegraph] 01/01: #1869 cluster role automatic management
This is an automated email from the ASF dual-hosted git repository.
vaughn pushed a commit to branch zy_dev
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit ed3f3cacdd9f76b31175ed1ae67d20a043e5aec0
Author: zyxxoo <13...@qq.com>
AuthorDate: Mon Aug 15 18:30:17 2022 +0800
#1869 cluster role automatic management
---
.../java/com/baidu/hugegraph/election/Config.java | 14 ++
.../com/baidu/hugegraph/election/MetaData.java | 45 ++++
.../baidu/hugegraph/election/MetaDataAdapter.java | 11 +
.../election/RoleElectionStateMachine.java | 9 +
.../election/RoleElectionStateMachineImpl.java | 277 +++++++++++++++++++++
.../hugegraph/election/StateMachineCallback.java | 16 ++
.../hugegraph/election/StateMachineContext.java | 17 ++
.../core/RoleElectionStateMachineTest.java | 214 ++++++++++++++++
8 files changed, 603 insertions(+)
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java
new file mode 100644
index 000000000..aa49ecc8a
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java
@@ -0,0 +1,14 @@
+package com.baidu.hugegraph.election;
+
+public interface Config {
+
+ String node();
+
+ int exceedsFailCount();
+
+ long randomTimeoutMillisecond();
+
+ long heartBeatIntervalSecond();
+
+ int exceedsWorkerCount();
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java
new file mode 100644
index 000000000..684c2fb12
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java
@@ -0,0 +1,45 @@
+package com.baidu.hugegraph.election;
+
+import java.util.Objects;
+
+public class MetaData {
+
+ String node;
+ long count;
+ int epoch;
+
+ public MetaData(String node, int epoch) {
+ this.node = node;
+ this.epoch = epoch;
+ this.count = 1;
+ }
+
+ public void increaseCount() {
+ this.count ++;
+ }
+
+ public boolean isMaster(String node) {
+ return Objects.equals(this.node, node);
+ }
+
+ public int epoch() {
+ return this.epoch;
+ }
+
+ public long count() {
+ return this.count;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof MetaData)) return false;
+ MetaData metaData = (MetaData) o;
+ return count == metaData.count && epoch == metaData.epoch && Objects.equals(node, metaData.node);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(node, count, epoch);
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java
new file mode 100644
index 000000000..c879c1f6d
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java
@@ -0,0 +1,11 @@
+package com.baidu.hugegraph.election;
+
+import java.util.Optional;
+
+public interface MetaDataAdapter {
+ boolean postDelyIfPresent(MetaData metaData, long delySecond);
+
+ Optional<MetaData> queryDelay(long delySecond);
+
+ Optional<MetaData> query();
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java
new file mode 100644
index 000000000..666c836a7
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java
@@ -0,0 +1,9 @@
+package com.baidu.hugegraph.election;
+
+public interface RoleElectionStateMachine {
+
+ void shutdown();
+
+ void apply(StateMachineCallback stateMachineCallback);
+
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java
new file mode 100644
index 000000000..b8d96026b
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java
@@ -0,0 +1,277 @@
+package com.baidu.hugegraph.election;
+
+import java.util.Optional;
+import java.util.concurrent.locks.LockSupport;
+
+import com.baidu.hugegraph.util.E;
+
+public class RoleElectionStateMachineImpl implements RoleElectionStateMachine{
+
+ private volatile boolean shutdown = false;
+ private Config config;
+
+ private volatile RoleState state;
+
+ private final MetaDataAdapter metaDataAdapter;
+
+ public RoleElectionStateMachineImpl(Config config, MetaDataAdapter adapter) {
+ this.config = config;
+ this.metaDataAdapter = adapter;
+ this.state = new UnKnownState(null);
+ }
+
+ @Override
+ public void shutdown() {
+ this.shutdown = true;
+ }
+
+ @Override
+ public void apply(StateMachineCallback stateMachineCallback) {
+ int failCount = 0;
+ while (!this.shutdown) {
+ E.checkArgumentNotNull(this.state, "State don't be null");
+ StateMachineContextImpl context = new StateMachineContextImpl(this);
+ try {
+ this.state = state.transform(context);
+ Callback runnable = this.state.callback(stateMachineCallback);
+ runnable.call(context);
+ failCount = 0;
+ } catch (Throwable e) {
+ stateMachineCallback.error(context, e);
+ failCount ++;
+ if (failCount >= this.config.exceedsFailCount()) {
+ this.state = new SafeState(context.epoch());
+ Callback runnable = this.state.callback(stateMachineCallback);
+ runnable.call(context);
+ }
+ }
+ }
+ }
+
+ private interface RoleState {
+
+ RoleState transform(StateMachineContext context);
+
+ Callback callback(StateMachineCallback callback);
+
+ static void heartBeatPark(StateMachineContext context) {
+ long heartBeatIntervalSecond = context.config().heartBeatIntervalSecond();
+ LockSupport.parkNanos(heartBeatIntervalSecond * 1_000_000_000);
+ }
+
+ static void randomPark(StateMachineContext context) {
+ long randomTimeout = context.config().randomTimeoutMillisecond();
+ LockSupport.parkNanos(randomTimeout * 1_000_000);
+ }
+ }
+
+ @FunctionalInterface
+ private interface Callback {
+
+ void call(StateMachineContext context);
+ }
+
+ private static class UnKnownState implements RoleState {
+
+ Integer epoch;
+
+ public UnKnownState(Integer epoch) {
+ this.epoch = epoch;
+ }
+
+ @Override
+ public RoleState transform(StateMachineContext context) {
+ MetaDataAdapter adapter = context.adapter();
+ Optional<MetaData> metaDataOpt = adapter.query();
+ if (!metaDataOpt.isPresent()) {
+ context.reset();
+ return new CandidateState(epoch == null ? 1 : epoch + 1);
+ }
+
+ MetaData metaData = metaDataOpt.get();
+ context.epoch(metaData.epoch());
+ if (metaData.isMaster(context.node())) {
+ return new MasterState(metaData);
+ } else {
+ return new WorkerState(metaData);
+ }
+ }
+
+ @Override
+ public Callback callback(StateMachineCallback callback) {
+ return callback::unknown;
+ }
+ }
+
+ private static class SafeState implements RoleState {
+
+ Integer epoch;
+
+ public SafeState(Integer epoch) {
+ this.epoch = epoch;
+ }
+
+ @Override
+ public RoleState transform(StateMachineContext context) {
+ RoleState.heartBeatPark(context);
+ return new UnKnownState(this.epoch).transform(context);
+ }
+
+ @Override
+ public Callback callback(StateMachineCallback callback) {
+ return callback::safe;
+ }
+ }
+
+ private static class MasterState implements RoleState {
+
+ MetaData metaData;
+
+ public MasterState(MetaData metaData) {
+ this.metaData = metaData;
+ }
+
+ @Override
+ public RoleState transform(StateMachineContext context) {
+ this.metaData.increaseCount();
+ RoleState.heartBeatPark(context);
+ if (context.adapter().postDelyIfPresent(this.metaData, -1)) {
+ return this;
+ }
+ context.reset();
+ return new UnKnownState(this.metaData.epoch());
+ }
+
+ @Override
+ public Callback callback(StateMachineCallback callback) {
+ return callback::master;
+ }
+ }
+
+ private static class WorkerState implements RoleState {
+
+ private MetaData metaData;
+ private int count = 0;
+
+ public WorkerState(MetaData metaData) {
+ this.metaData = metaData;
+ }
+
+ @Override
+ public RoleState transform(StateMachineContext context) {
+ RoleState.heartBeatPark(context);
+ RoleState nextState = new UnKnownState(this.metaData.epoch()).transform(context);
+ if (nextState instanceof WorkerState) {
+ this.merge((WorkerState) nextState);
+ if (this.count > context.config().exceedsWorkerCount()) {
+ return new CandidateState(this.metaData.epoch() + 1);
+ } else {
+ return this;
+ }
+ } else {
+ return nextState;
+ }
+ }
+
+ @Override
+ public Callback callback(StateMachineCallback callback) {
+ return callback::worker;
+ }
+
+ public void merge(WorkerState state) {
+ if (state.metaData.epoch() > this.metaData.epoch()) {
+ this.count = 0;
+ this.metaData = state.metaData;
+ } else if (state.metaData.epoch() < this.metaData.epoch()){
+ throw new IllegalStateException("Epoch must increase");
+ } else if (state.metaData.epoch() == this.metaData.epoch() &&
+ state.metaData.count() < this.metaData.count()) {
+ throw new IllegalStateException("Meta count must increase");
+ } else if (state.metaData.epoch() == this.metaData.epoch() &&
+ state.metaData.count() > this.metaData.count()) {
+ this.metaData = state.metaData;
+ } else {
+ this.count ++;
+ }
+ }
+ }
+
+ private static class CandidateState implements RoleState {
+
+ Integer epoch;
+
+ public CandidateState(Integer epoch) {
+ this.epoch = epoch;
+ }
+
+ @Override
+ public RoleState transform(StateMachineContext context) {
+ RoleState.randomPark(context);
+ int epoch = this.epoch == null ? 1 : this.epoch;
+ MetaData metaData = new MetaData(context.config().node(), epoch);
+ //failover to master success
+ context.epoch(metaData.epoch());
+ if (context.adapter().postDelyIfPresent(metaData, -1)) {
+ return new MasterState(metaData);
+ } else {
+ return new WorkerState(metaData);
+ }
+ }
+
+ @Override
+ public Callback callback(StateMachineCallback callback) {
+ return callback::candidate;
+ }
+ }
+
+ private static class StateMachineContextImpl implements StateMachineContext {
+
+ private Integer epoch;
+ private final String node;
+ private final RoleElectionStateMachineImpl machine;
+
+ public StateMachineContextImpl(RoleElectionStateMachineImpl machine) {
+ this.node = machine.config.node();
+ this.machine = machine;
+ }
+
+ @Override
+ public Integer epoch() {
+ return this.epoch;
+ }
+
+ @Override
+ public String node() {
+ return this.node;
+ }
+
+ @Override
+ public void epoch(Integer epoch) {
+ this.epoch = epoch;
+ }
+
+ @Override
+ public MetaDataAdapter adapter() {
+ return this.machine.adapter();
+ }
+
+ @Override
+ public Config config() {
+ return this.machine.config;
+ }
+
+ @Override
+ public RoleElectionStateMachine stateMachine() {
+ return this.machine;
+ }
+
+ @Override
+ public void reset() {
+ this.epoch = null;
+ }
+ }
+
+ protected MetaDataAdapter adapter() {
+ return this.metaDataAdapter;
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java
new file mode 100644
index 000000000..b765c455c
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java
@@ -0,0 +1,16 @@
+package com.baidu.hugegraph.election;
+
+public interface StateMachineCallback {
+
+ void master(StateMachineContext context);
+
+ void worker(StateMachineContext context);
+
+ void candidate(StateMachineContext context);
+
+ void unknown(StateMachineContext context);
+
+ void safe(StateMachineContext context);
+
+ void error(StateMachineContext context, Throwable e);
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java
new file mode 100644
index 000000000..9b4ca07ab
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java
@@ -0,0 +1,17 @@
+package com.baidu.hugegraph.election;
+
+public interface StateMachineContext {
+ Integer epoch();
+
+ String node();
+
+ RoleElectionStateMachine stateMachine();
+
+ void epoch(Integer epoch);
+
+ Config config();
+
+ MetaDataAdapter adapter();
+
+ void reset();
+}
diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java
new file mode 100644
index 000000000..e35952a8f
--- /dev/null
+++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java
@@ -0,0 +1,214 @@
+package com.baidu.hugegraph.core;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.LockSupport;
+
+import com.baidu.hugegraph.election.Config;
+import com.baidu.hugegraph.election.MetaData;
+import com.baidu.hugegraph.election.MetaDataAdapter;
+import com.baidu.hugegraph.election.RoleElectionStateMachine;
+import com.baidu.hugegraph.election.RoleElectionStateMachineImpl;
+import com.baidu.hugegraph.election.StateMachineCallback;
+import com.baidu.hugegraph.election.StateMachineContext;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RoleElectionStateMachineTest {
+
+ public static class LogEntry {
+
+ Integer epoch;
+ String node;
+
+ Role role;
+
+ enum Role {
+ master,
+ worker,
+ candidate,
+ safe,
+ unknown
+ }
+
+ public LogEntry(Integer epoch, String node, Role role) {
+ this.epoch = epoch;
+ this.node = node;
+ this.role = role;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof LogEntry)) return false;
+ LogEntry logEntry = (LogEntry) o;
+ return Objects.equals(epoch, logEntry.epoch) && Objects.equals(node, logEntry.node) && role == logEntry.role;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(epoch, node, role);
+ }
+
+ @Override
+ public String toString() {
+ return "LogEntry{" +
+ "epoch=" + epoch +
+ ", node='" + node + '\'' +
+ ", role=" + role +
+ '}';
+ }
+ }
+
+ private static class TestConfig implements Config {
+
+ String node;
+
+ public TestConfig(String node) {
+ this.node = node;
+ }
+
+ @Override
+ public String node() {
+ return this.node;
+ }
+
+ @Override
+ public int exceedsFailCount() {
+ return 10;
+ }
+
+ @Override
+ public long randomTimeoutMillisecond() {
+ return 400;
+ }
+
+ @Override
+ public long heartBeatIntervalSecond() {
+ return 1;
+ }
+
+ @Override
+ public int exceedsWorkerCount() {
+ return 5;
+ }
+ }
+
+ @Test
+ public void testStateMachine() throws InterruptedException {
+ final CountDownLatch stop = new CountDownLatch(3);
+ final List<LogEntry> logRecords = Collections.synchronizedList(new ArrayList<>(20));
+ final StateMachineCallback callback = new StateMachineCallback() {
+
+ @Override
+ public void master(StateMachineContext context) {
+ Integer epochId = context.epoch();
+ String node = context.node();
+ logRecords.add(new LogEntry(epochId, node, LogEntry.Role.master));
+ }
+
+ @Override
+ public void worker(StateMachineContext context) {
+ Integer epochId = context.epoch();
+ String node = context.node();
+ logRecords.add(new LogEntry(epochId, node, LogEntry.Role.worker));
+ }
+
+ @Override
+ public void candidate(StateMachineContext context) {
+ Integer epochId = context.epoch();
+ String node = context.node();
+ logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate));
+ }
+
+ @Override
+ public void unknown(StateMachineContext context) {
+ Integer epochId = context.epoch();
+ String node = context.node();
+ logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate));
+ }
+
+ @Override
+ public void safe(StateMachineContext context) {
+ Integer epochId = context.epoch();
+ String node = context.node();
+ logRecords.add(new LogEntry(epochId, node, LogEntry.Role.safe));
+ }
+
+ @Override
+ public void error(StateMachineContext context, Throwable e) {
+
+ }
+ };
+ final MetaDataAdapter adapter = new MetaDataAdapter() {
+ int epoch = 0;
+ Map<Integer, MetaData> data = new ConcurrentHashMap<>();
+ @Override
+ public boolean postDelyIfPresent(MetaData metaData, long delySecond) {
+ LockSupport.parkNanos(delySecond * 1_000_000_000);
+ MetaData oldData = data.computeIfAbsent(metaData.epoch(), (key) -> {
+ this.epoch = key;
+ return metaData;
+ });
+ return oldData == metaData;
+ }
+
+ @Override
+ public Optional<MetaData> queryDelay(long delySecond) {
+ LockSupport.parkNanos(delySecond * 1_000_000_000);
+ return Optional.ofNullable(this.data.get(this.epoch));
+ }
+
+ @Override
+ public Optional<MetaData> query() {
+ return Optional.ofNullable(this.data.get(this.epoch));
+ }
+ };
+
+ Thread node1 = new Thread(() -> {
+ Config config = new TestConfig("1");
+ RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
+ stateMachine.apply(callback);
+ stop.countDown();
+ });
+
+ Thread node2 = new Thread(() -> {
+ Config config = new TestConfig("2");
+ RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
+ stateMachine.apply(callback);
+ stop.countDown();
+ });
+
+ Thread node3 = new Thread(() -> {
+ Config config = new TestConfig("3");
+ RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
+ stateMachine.apply(callback);
+ stop.countDown();
+ });
+
+ node1.start();
+ node2.start();
+ node3.start();
+
+ stop.await();
+
+ Assert.assertTrue(logRecords.size() > 100);
+ Map<Integer, String> masters = new HashMap<>();
+ for (LogEntry entry: logRecords) {
+ if (entry.role == LogEntry.Role.master) {
+ String lastNode = masters.putIfAbsent(entry.epoch, entry.node);
+ Assert.assertEquals(lastNode, entry.node);
+ }
+ }
+
+ Assert.assertTrue(masters.size() > 0);
+ }
+}