You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by va...@apache.org on 2022/09/16 02:53:15 UTC
[incubator-hugegraph] branch master updated: feat(core): cluster role automatic management (#1943)
This is an automated email from the ASF dual-hosted git repository.
vaughn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new 51d29ac3a feat(core): cluster role automatic management (#1943)
51d29ac3a is described below
commit 51d29ac3a0ec04c43c99e5262655d344d873fc1d
Author: zyxxoo <13...@qq.com>
AuthorDate: Fri Sep 16 10:53:06 2022 +0800
feat(core): cluster role automatic management (#1943)
* #1869 cluster role automatic management
* improve code
* FIX bug
* improve code
* improve code
* improve code
* improve code
* improve code
* improve code
---
.../java/com/baidu/hugegraph/election/Config.java | 35 +++
.../election/RoleElectionStateMachine.java | 27 ++
.../election/RoleElectionStateMachineImpl.java | 312 ++++++++++++++++++++
.../com/baidu/hugegraph/election/RoleTypeData.java | 91 ++++++
.../hugegraph/election/RoleTypeDataAdapter.java | 29 ++
.../hugegraph/election/StateMachineCallback.java | 35 +++
.../hugegraph/election/StateMachineContext.java | 37 +++
.../com/baidu/hugegraph/core/CoreTestSuite.java | 3 +-
.../core/RoleElectionStateMachineTest.java | 318 +++++++++++++++++++++
9 files changed, 886 insertions(+), 1 deletion(-)
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java
new file mode 100644
index 000000000..ad39c0b5e
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+public interface Config {
+
+ String node();
+
+ int exceedsFailCount();
+
+ long randomTimeoutMillisecond();
+
+ long heartBeatIntervalSecond();
+
+ int exceedsWorkerCount();
+
+ long baseTimeoutMillisecond();
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java
new file mode 100644
index 000000000..4bc258623
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+public interface RoleElectionStateMachine {
+
+ void shutdown();
+
+ void apply(StateMachineCallback stateMachineCallback);
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java
new file mode 100644
index 000000000..016f51cb8
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java
@@ -0,0 +1,312 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+import java.security.SecureRandom;
+import java.util.Optional;
+import java.util.concurrent.locks.LockSupport;
+
+import com.baidu.hugegraph.util.E;
+
+public class RoleElectionStateMachineImpl implements RoleElectionStateMachine {
+
+ private volatile boolean shutdown;
+ private final Config config;
+ private volatile RoleState state;
+ private final RoleTypeDataAdapter roleTypeDataAdapter;
+
+ public RoleElectionStateMachineImpl(Config config, RoleTypeDataAdapter adapter) {
+ this.config = config;
+ this.roleTypeDataAdapter = adapter;
+ this.state = new UnknownState(null);
+ this.shutdown = false;
+ }
+
+ @Override
+ public void shutdown() {
+ this.shutdown = true;
+ }
+
+ @Override
+ public void apply(StateMachineCallback stateMachineCallback) {
+ int failCount = 0;
+ StateMachineContextImpl context = new StateMachineContextImpl(this);
+ while (!this.shutdown) {
+ E.checkArgumentNotNull(this.state, "State don't be null");
+ try {
+ this.state = state.transform(context);
+ Callback runnable = this.state.callback(stateMachineCallback);
+ runnable.call(context);
+ failCount = 0;
+ } catch (Throwable e) {
+ stateMachineCallback.error(context, e);
+ failCount ++;
+ if (failCount >= this.config.exceedsFailCount()) {
+ this.state = new AbdicationState(context.epoch());
+ Callback runnable = this.state.callback(stateMachineCallback);
+ runnable.call(context);
+ }
+ }
+ }
+ }
+
+ private interface RoleState {
+
+ SecureRandom secureRandom = new SecureRandom();
+
+ RoleState transform(StateMachineContext context);
+
+ Callback callback(StateMachineCallback callback);
+
+ static void heartBeatPark(StateMachineContext context) {
+ long heartBeatIntervalSecond = context.config().heartBeatIntervalSecond();
+ LockSupport.parkNanos(heartBeatIntervalSecond * 1_000_000_000);
+ }
+
+ static void randomPark(StateMachineContext context) {
+ long randomTimeout = context.config().randomTimeoutMillisecond();
+ long baseTime = context.config().baseTimeoutMillisecond();
+ long timeout = (long) (baseTime + (randomTimeout / 10.0 * secureRandom.nextInt(11)));
+ LockSupport.parkNanos(timeout * 1_000_000);
+ }
+ }
+
+ @FunctionalInterface
+ private interface Callback {
+
+ void call(StateMachineContext context);
+ }
+
+ private static class UnknownState implements RoleState {
+
+ final Integer epoch;
+
+ public UnknownState(Integer epoch) {
+ this.epoch = epoch;
+ }
+
+ @Override
+ public RoleState transform(StateMachineContext context) {
+ RoleTypeDataAdapter adapter = context.adapter();
+ Optional<RoleTypeData> roleTypeDataOpt = adapter.query();
+ if (!roleTypeDataOpt.isPresent()) {
+ context.reset();
+ Integer nextEpoch = this.epoch == null ? 1 : this.epoch + 1;
+ context.epoch(nextEpoch);
+ return new CandidateState(nextEpoch);
+ }
+
+ RoleTypeData roleTypeData = roleTypeDataOpt.get();
+ if (this.epoch != null && roleTypeData.epoch() < this.epoch) {
+ context.reset();
+ Integer nextEpoch = this.epoch + 1;
+ context.epoch(nextEpoch);
+ return new CandidateState(nextEpoch);
+ }
+
+ context.epoch(roleTypeData.epoch());
+ if (roleTypeData.isMaster(context.node())) {
+ return new MasterState(roleTypeData);
+ } else {
+ return new WorkerState(roleTypeData);
+ }
+ }
+
+ @Override
+ public Callback callback(StateMachineCallback callback) {
+ return callback::unknown;
+ }
+ }
+
+ private static class AbdicationState implements RoleState {
+
+ private final Integer epoch;
+
+ public AbdicationState(Integer epoch) {
+ this.epoch = epoch;
+ }
+
+ @Override
+ public RoleState transform(StateMachineContext context) {
+ RoleState.heartBeatPark(context);
+ return new UnknownState(this.epoch).transform(context);
+ }
+
+ @Override
+ public Callback callback(StateMachineCallback callback) {
+ return callback::abdication;
+ }
+ }
+
+ private static class MasterState implements RoleState {
+
+ private final RoleTypeData roleTypeData;
+
+ public MasterState(RoleTypeData roleTypeData) {
+ this.roleTypeData = roleTypeData;
+ }
+
+ @Override
+ public RoleState transform(StateMachineContext context) {
+ this.roleTypeData.increaseClock();
+ RoleState.heartBeatPark(context);
+ if (context.adapter().updateIfNodePresent(this.roleTypeData)) {
+ return this;
+ }
+ context.reset();
+ context.epoch(this.roleTypeData.epoch());
+ return new UnknownState(this.roleTypeData.epoch()).transform(context);
+ }
+
+ @Override
+ public Callback callback(StateMachineCallback callback) {
+ return callback::master;
+ }
+ }
+
+ private static class WorkerState implements RoleState {
+
+ private RoleTypeData roleTypeData;
+ private int clock;
+
+ public WorkerState(RoleTypeData roleTypeData) {
+ this.roleTypeData = roleTypeData;
+ this.clock = 0;
+ }
+
+ @Override
+ public RoleState transform(StateMachineContext context) {
+ RoleState.heartBeatPark(context);
+ RoleState nextState = new UnknownState(this.roleTypeData.epoch()).transform(context);
+ if (nextState instanceof WorkerState) {
+ this.merge((WorkerState) nextState);
+ if (this.clock > context.config().exceedsWorkerCount()) {
+ return new CandidateState(this.roleTypeData.epoch() + 1);
+ } else {
+ return this;
+ }
+ } else {
+ return nextState;
+ }
+ }
+
+ @Override
+ public Callback callback(StateMachineCallback callback) {
+ return callback::worker;
+ }
+
+ public void merge(WorkerState state) {
+ if (state.roleTypeData.epoch() > this.roleTypeData.epoch()) {
+ this.clock = 0;
+ this.roleTypeData = state.roleTypeData;
+ } else if (state.roleTypeData.epoch() < this.roleTypeData.epoch()){
+ throw new IllegalStateException("Epoch must increase");
+ } else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() &&
+ state.roleTypeData.clock() < this.roleTypeData.clock()) {
+ throw new IllegalStateException("Clock must increase");
+ } else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() &&
+ state.roleTypeData.clock() > this.roleTypeData.clock()) {
+ this.clock = 0;
+ this.roleTypeData = state.roleTypeData;
+ } else {
+ this.clock++;
+ }
+ }
+ }
+
+ private static class CandidateState implements RoleState {
+
+ private final Integer epoch;
+
+ public CandidateState(Integer epoch) {
+ this.epoch = epoch;
+ }
+
+ @Override
+ public RoleState transform(StateMachineContext context) {
+ RoleState.randomPark(context);
+ int epoch = this.epoch == null ? 1 : this.epoch;
+ RoleTypeData roleTypeData = new RoleTypeData(context.config().node(), epoch);
+ //failover to master success
+ context.epoch(roleTypeData.epoch());
+ if (context.adapter().updateIfNodePresent(roleTypeData)) {
+ return new MasterState(roleTypeData);
+ } else {
+ return new UnknownState(epoch).transform(context);
+ }
+ }
+
+ @Override
+ public Callback callback(StateMachineCallback callback) {
+ return callback::candidate;
+ }
+ }
+
+ private static class StateMachineContextImpl implements StateMachineContext {
+
+ private Integer epoch;
+ private final String node;
+ private final RoleElectionStateMachineImpl machine;
+
+ public StateMachineContextImpl(RoleElectionStateMachineImpl machine) {
+ this.node = machine.config.node();
+ this.machine = machine;
+ }
+
+ @Override
+ public Integer epoch() {
+ return this.epoch;
+ }
+
+ @Override
+ public String node() {
+ return this.node;
+ }
+
+ @Override
+ public void epoch(Integer epoch) {
+ this.epoch = epoch;
+ }
+
+ @Override
+ public RoleTypeDataAdapter adapter() {
+ return this.machine.adapter();
+ }
+
+ @Override
+ public Config config() {
+ return this.machine.config;
+ }
+
+ @Override
+ public RoleElectionStateMachine stateMachine() {
+ return this.machine;
+ }
+
+ @Override
+ public void reset() {
+ this.epoch = null;
+ }
+ }
+
+ protected RoleTypeDataAdapter adapter() {
+ return this.roleTypeDataAdapter;
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeData.java
new file mode 100644
index 000000000..aa60c47ac
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeData.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+import java.util.Objects;
+
+public class RoleTypeData {
+
+ private String node;
+ private long clock;
+ private int epoch;
+
+ public RoleTypeData(String node, int epoch) {
+ this(node, epoch, 1);
+ }
+
+ public RoleTypeData(String node, int epoch, long clock) {
+ this.node = node;
+ this.epoch = epoch;
+ this.clock = clock;
+ }
+
+ public void increaseClock() {
+ this.clock++;
+ }
+
+ public boolean isMaster(String node) {
+ return Objects.equals(this.node, node);
+ }
+
+ public int epoch() {
+ return this.epoch;
+ }
+
+ public long clock() {
+ return this.clock;
+ }
+
+ public void clock(long clock) {
+ this.clock = clock;
+ }
+
+ public String node() {
+ return this.node;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof RoleTypeData)) {
+ return false;
+ }
+ RoleTypeData metaData = (RoleTypeData) obj;
+ return clock == metaData.clock &&
+ epoch == metaData.epoch &&
+ Objects.equals(node, metaData.node);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(node, clock, epoch);
+ }
+
+ @Override
+ public String toString() {
+ return "RoleStateData{" +
+ "node='" + node + '\'' +
+ ", clock=" + clock +
+ ", epoch=" + epoch +
+ '}';
+ }
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java
new file mode 100644
index 000000000..41a2021f0
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+import java.util.Optional;
+
+public interface RoleTypeDataAdapter {
+
+ boolean updateIfNodePresent(RoleTypeData stateData);
+
+ Optional<RoleTypeData> query();
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java
new file mode 100644
index 000000000..8403b5950
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+public interface StateMachineCallback {
+
+ void master(StateMachineContext context);
+
+ void worker(StateMachineContext context);
+
+ void candidate(StateMachineContext context);
+
+ void unknown(StateMachineContext context);
+
+ void abdication(StateMachineContext context);
+
+ void error(StateMachineContext context, Throwable e);
+}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java
new file mode 100644
index 000000000..a3693f5fa
--- /dev/null
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.election;
+
+public interface StateMachineContext {
+
+ Integer epoch();
+
+ String node();
+
+ RoleElectionStateMachine stateMachine();
+
+ void epoch(Integer epoch);
+
+ Config config();
+
+ RoleTypeDataAdapter adapter();
+
+ void reset();
+}
diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java
index 509cdf814..d6dd4c350 100644
--- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java
+++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java
@@ -49,7 +49,8 @@ import com.baidu.hugegraph.util.Log;
TaskCoreTest.class,
AuthTest.class,
MultiGraphsTest.class,
- RamTableTest.class
+ RamTableTest.class,
+ RoleElectionStateMachineTest.class
})
public class CoreTestSuite {
diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java
new file mode 100644
index 000000000..6e8af972d
--- /dev/null
+++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java
@@ -0,0 +1,318 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.core;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.LockSupport;
+
+import com.baidu.hugegraph.election.Config;
+import com.baidu.hugegraph.election.RoleElectionStateMachine;
+import com.baidu.hugegraph.election.RoleElectionStateMachineImpl;
+import com.baidu.hugegraph.election.RoleTypeData;
+import com.baidu.hugegraph.election.RoleTypeDataAdapter;
+import com.baidu.hugegraph.election.StateMachineCallback;
+import com.baidu.hugegraph.election.StateMachineContext;
+import com.baidu.hugegraph.testutil.Assert;
+import org.junit.Test;
+
+public class RoleElectionStateMachineTest {
+
+ public static class LogEntry {
+
+ Integer epoch;
+
+ String node;
+
+ Role role;
+
+ enum Role {
+ master,
+ worker,
+ candidate,
+ abdication,
+ unknown
+ }
+
+ public LogEntry(Integer epoch, String node, Role role) {
+ this.epoch = epoch;
+ this.node = node;
+ this.role = role;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof LogEntry)) {
+ return false;
+ }
+ LogEntry logEntry = (LogEntry) obj;
+ return Objects.equals(epoch, logEntry.epoch) &&
+ Objects.equals(node, logEntry.node) && role == logEntry.role;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(epoch, node, role);
+ }
+
+ @Override
+ public String toString() {
+ return "LogEntry{" +
+ "epoch=" + epoch +
+ ", node='" + node + '\'' +
+ ", role=" + role +
+ '}';
+ }
+ }
+
+ private static class TestConfig implements Config {
+
+ String node;
+
+ public TestConfig(String node) {
+ this.node = node;
+ }
+
+ @Override
+ public String node() {
+ return this.node;
+ }
+
+ @Override
+ public int exceedsFailCount() {
+ return 2;
+ }
+
+ @Override
+ public long randomTimeoutMillisecond() {
+ return 400;
+ }
+
+ @Override
+ public long heartBeatIntervalSecond() {
+ return 1;
+ }
+
+ @Override
+ public int exceedsWorkerCount() {
+ return 5;
+ }
+
+ @Override
+ public long baseTimeoutMillisecond() {
+ return 100;
+ }
+ }
+
+ @Test
+ public void testStateMachine() throws InterruptedException {
+ final CountDownLatch stop = new CountDownLatch(4);
+ final int MAX_COUNT = 200;
+ final List<LogEntry> logRecords = Collections.synchronizedList(new ArrayList<>(MAX_COUNT));
+ final List<String> masterNodes = Collections.synchronizedList(new ArrayList<>(MAX_COUNT));
+ final StateMachineCallback callback = new StateMachineCallback() {
+
+ @Override
+ public void master(StateMachineContext context) {
+ Integer epochId = context.epoch();
+ String node = context.node();
+ logRecords.add(new LogEntry(epochId, node, LogEntry.Role.master));
+ if (logRecords.size() > MAX_COUNT) {
+ context.stateMachine().shutdown();
+ }
+ System.out.println("master node: " + node);
+ masterNodes.add(node);
+ }
+
+ @Override
+ public void worker(StateMachineContext context) {
+ Integer epochId = context.epoch();
+ String node = context.node();
+ logRecords.add(new LogEntry(epochId, node, LogEntry.Role.worker));
+ if (logRecords.size() > MAX_COUNT) {
+ context.stateMachine().shutdown();
+ }
+ }
+
+ @Override
+ public void candidate(StateMachineContext context) {
+ Integer epochId = context.epoch();
+ String node = context.node();
+ logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate));
+ if (logRecords.size() > MAX_COUNT) {
+ context.stateMachine().shutdown();
+ }
+ }
+
+ @Override
+ public void unknown(StateMachineContext context) {
+ Integer epochId = context.epoch();
+ String node = context.node();
+ logRecords.add(new LogEntry(epochId, node, LogEntry.Role.unknown));
+ if (logRecords.size() > MAX_COUNT) {
+ context.stateMachine().shutdown();
+ }
+ }
+
+ @Override
+ public void abdication(StateMachineContext context) {
+ Integer epochId = context.epoch();
+ String node = context.node();
+ logRecords.add(new LogEntry(epochId, node, LogEntry.Role.abdication));
+ if (logRecords.size() > MAX_COUNT) {
+ context.stateMachine().shutdown();
+ }
+ }
+
+ @Override
+ public void error(StateMachineContext context, Throwable e) {
+ System.out.println("state machine error: node " + context.node() + " message " + e.getMessage());
+ }
+ };
+
+ final List<RoleTypeData> metaDataLogs = Collections.synchronizedList(new ArrayList<>(100));
+ final RoleTypeDataAdapter adapter = new RoleTypeDataAdapter() {
+
+ volatile int epoch = 0;
+
+ final Map<Integer, RoleTypeData> data = new ConcurrentHashMap<>();
+
+ RoleTypeData copy(RoleTypeData stateData) {
+ if (stateData == null) {
+ return null;
+ }
+ return new RoleTypeData(stateData.node(), stateData.epoch(), stateData.clock());
+ }
+
+ @Override
+ public boolean updateIfNodePresent(RoleTypeData stateData) {
+ if (stateData.epoch() < this.epoch) {
+ return false;
+ }
+
+ RoleTypeData copy = this.copy(stateData);
+ RoleTypeData newData = data.compute(copy.epoch(), (key, value) -> {
+ if (copy.epoch() > this.epoch) {
+ this.epoch = copy.epoch();
+ Assert.assertNull(value);
+ metaDataLogs.add(copy);
+ System.out.println("The node " + copy + " become new master:");
+ return copy;
+ }
+
+ Assert.assertEquals(value.epoch(), copy.epoch());
+ if (Objects.equals(value.node(), copy.node()) &&
+ value.clock() <= copy.clock()) {
+ System.out.println("The master node " + copy + " keep heartbeat");
+ metaDataLogs.add(copy);
+ if (value.clock() == copy.clock()) {
+ Assert.fail("Clock must increase when same epoch and node id");
+ }
+ return copy;
+ }
+ return value;
+
+ });
+ return Objects.equals(newData, copy);
+ }
+
+ @Override
+ public Optional<RoleTypeData> query() {
+ return Optional.ofNullable(this.copy(this.data.get(this.epoch)));
+ }
+ };
+
+ RoleElectionStateMachine[] machines = new RoleElectionStateMachine[4];
+ Thread node1 = new Thread(() -> {
+ Config config = new TestConfig("1");
+ RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
+ machines[1] = stateMachine;
+ stateMachine.apply(callback);
+ stop.countDown();
+ });
+
+ Thread node2 = new Thread(() -> {
+ Config config = new TestConfig("2");
+ RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
+ machines[2] = stateMachine;
+ stateMachine.apply(callback);
+ stop.countDown();
+ });
+
+ Thread node3 = new Thread(() -> {
+ Config config = new TestConfig("3");
+ RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter);
+ machines[3] = stateMachine;
+ stateMachine.apply(callback);
+ stop.countDown();
+ });
+
+ node1.start();
+ node2.start();
+ node3.start();
+
+ Thread randomShutdown = new Thread(() -> {
+ Set<String> dropNodes = new HashSet<>();
+ while (dropNodes.size() < 3) {
+ LockSupport.parkNanos(5_000_000_000L);
+ int size = masterNodes.size();
+ if (size < 1) {
+ continue;
+ }
+ String node = masterNodes.get(size - 1);
+ if (dropNodes.contains(node)) {
+ continue;
+ }
+ machines[Integer.parseInt(node)].shutdown();
+ dropNodes.add(node);
+ System.out.println("----shutdown machine " + node);
+ }
+ stop.countDown();
+ });
+
+ randomShutdown.start();
+ stop.await();
+
+ Assert.assertGt(0, logRecords.size());
+ Map<Integer, String> masters = new HashMap<>();
+ for (LogEntry entry: logRecords) {
+ if (entry.role == LogEntry.Role.master) {
+ String lastNode = masters.putIfAbsent(entry.epoch, entry.node);
+ if (lastNode != null) {
+ Assert.assertEquals(lastNode, entry.node);
+ }
+ }
+ }
+
+ Assert.assertGt(0, masters.size());
+ }
+}