You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/03 07:09:42 UTC
[iotdb] branch master updated: [IOTDB-3076]Optimize StandAloneConsensus read/write performance && consensus modu… (#5768)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5e6274c11e [IOTDB-3076]Optimize StandAloneConsensus read/write performance && consensus modu… (#5768)
5e6274c11e is described below
commit 5e6274c11e3f9fe99df03200ebd4588917456d29
Author: Potato <TX...@gmail.com>
AuthorDate: Tue May 3 15:09:37 2022 +0800
[IOTDB-3076]Optimize StandAloneConsensus read/write performance && consensus modu… (#5768)
* Optimize StandAloneConsensus read/write performance && consensus module code refactor
* refactor
---
.../statemachine/PartitionRegionStateMachine.java | 2 +-
.../apache/iotdb/consensus/ConsensusFactory.java | 1 -
.../{statemachine => }/IStateMachine.java | 5 +-
.../exception/IllegalPeerEndpointException.java | 32 ++++++
.../ratis/ApplicationStateMachineProxy.java | 2 +-
.../iotdb/consensus/ratis/RatisConsensus.java | 2 +-
.../iotdb/consensus/ratis/RequestMessage.java | 4 -
.../iotdb/consensus/ratis/SnapshotStorage.java | 4 +-
.../org/apache/iotdb/consensus/ratis/Utils.java | 49 +--------
.../consensus/standalone/StandAloneConsensus.java | 116 ++++++++++-----------
.../consensus/standalone/StandAloneServerImpl.java | 14 ++-
.../apache/iotdb/consensus}/EmptyStateMachine.java | 2 +-
.../apache/iotdb/consensus/ratis/TestUtils.java | 2 +-
.../iotdb/consensus/standalone/RecoveryTest.java | 4 +-
.../standalone/StandAloneConsensusTest.java | 18 +++-
.../iotdb/commons/consensus/ConsensusGroupId.java | 82 ++++++++-------
.../iotdb/commons/consensus/DataRegionId.java | 39 +------
.../iotdb/commons/consensus/PartitionRegionId.java | 39 +------
.../iotdb/commons/consensus/SchemaRegionId.java | 39 +------
.../apache/iotdb/commons/ConsensusGroupIdTest.java | 4 +-
.../consensus/statemachine/BaseStateMachine.java | 2 +-
.../service/thrift/impl/InternalServiceImpl.java | 2 +-
.../iotdb/db/service/InternalServiceImplTest.java | 4 +-
23 files changed, 180 insertions(+), 288 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index fa14e859f0..e429f61f20 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
import org.apache.iotdb.confignode.service.executor.ConfigRequestExecutor;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.rpc.TSStatusCode;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index d9364ad13e..efc40e31e6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.consensus;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
similarity index 97%
rename from consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index aab3e17bcd..c090fa2a35 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.statemachine;
+package org.apache.iotdb.consensus;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -25,10 +25,13 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import javax.annotation.concurrent.ThreadSafe;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.function.Function;
+@ThreadSafe
public interface IStateMachine {
interface Registry extends Function<ConsensusGroupId, IStateMachine> {}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerEndpointException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerEndpointException.java
new file mode 100644
index 0000000000..7591345acc
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerEndpointException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+public class IllegalPeerEndpointException extends ConsensusException {
+
+ public IllegalPeerEndpointException(TEndPoint currentNode, TEndPoint newNode) {
+ super(
+ String.format(
+ "Illegal creation for node %s in node %s in StandAloneConsensus Mode",
+ newNode, currentNode));
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 53113293a0..317029a740 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index d022f096cc..f8d65fa0b0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.IClientPoolFactory;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -39,7 +40,6 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
index 9dc73d815c..242573e5be 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java
@@ -23,15 +23,11 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
public class RequestMessage implements Message {
- private final Logger logger = LoggerFactory.getLogger(RequestMessage.class);
-
private final IConsensusRequest actualRequest;
private volatile ByteString serializedContent;
private static final int DEFAULT_BUFFER_SIZE = 2048 * 10;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index 2fbc7916ca..c50a4b3446 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.consensus.ratis;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.SnapshotMeta;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.server.protocol.TermIndex;
@@ -46,7 +46,7 @@ import java.util.List;
* hopefully will be introduced in Ratis 2.3.0.
*/
public class SnapshotStorage implements StateMachineStorage {
- private IStateMachine applicationStateMachine;
+ private final IStateMachine applicationStateMachine;
private File stateMachineDir;
private final Logger logger = LoggerFactory.getLogger(SnapshotStorage.class);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index 5ec86b994b..c0683572a9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -21,9 +21,6 @@ package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.consensus.PartitionRegionId;
-import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.ratis.protocol.RaftGroupId;
@@ -41,9 +38,6 @@ import java.nio.charset.Charset;
public class Utils {
private static final int tempBufferSize = 1024;
private static final byte PADDING_MAGIC = 0x47;
- private static final long DataRegionType = 0x01;
- private static final long SchemaRegionType = 0x02;
- private static final long PartitionRegionType = 0x03;
public static String IPAddress(TEndPoint endpoint) {
return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
@@ -52,30 +46,9 @@ public class Utils {
/** Encode the ConsensusGroupId into 6 bytes 2 Bytes for Group Type 4 Bytes for Group ID */
public static long groupEncode(ConsensusGroupId consensusGroupId) {
// use abbreviations to prevent overflow
- long groupType = 0L;
- switch (consensusGroupId.getType()) {
- case DataRegion:
- {
- groupType = DataRegionType;
- break;
- }
- case SchemaRegion:
- {
- groupType = SchemaRegionType;
- break;
- }
- case PartitionRegion:
- {
- groupType = PartitionRegionType;
- break;
- }
- default:
- {
- return -1;
- }
- }
+ long groupType = consensusGroupId.getType().getValue();
long groupCode = groupType << 32;
- groupCode += (long) consensusGroupId.getId();
+ groupCode += consensusGroupId.getId();
return groupCode;
}
@@ -127,20 +100,7 @@ public class Utils {
ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES);
byteBuffer.put(padded, 12, 4);
byteBuffer.flip();
- int gid = byteBuffer.getInt();
- ConsensusGroupId id;
-
- if (type == DataRegionType) {
- id = new DataRegionId(gid);
- } else if (type == PartitionRegionType) {
- id = new PartitionRegionId(gid);
- } else if (type == SchemaRegionType) {
- id = new SchemaRegionId(gid);
- } else {
- throw new IllegalArgumentException(
- String.format("Unexpected consensusGroupId Type %d", type));
- }
- return id;
+ return ConsensusGroupId.Factory.create((int) type, byteBuffer.getInt());
}
public static ByteBuffer serializeTSStatus(TSStatus status) throws TException {
@@ -162,8 +122,7 @@ public class Utils {
public static ByteBuffer getMetadataFromTermIndex(TermIndex termIndex) {
String ordinal = String.format("%d_%d", termIndex.getTerm(), termIndex.getIndex());
- ByteBuffer metadata = ByteBuffer.wrap(ordinal.getBytes());
- return metadata;
+ return ByteBuffer.wrap(ordinal.getBytes());
}
public static TermIndex getTermIndexFromMetadata(ByteBuffer metadata) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index f0d15221f2..4c2100d1e1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -21,10 +21,10 @@ package org.apache.iotdb.consensus.standalone;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.IConsensus;
-import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.IStateMachine.Registry;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
@@ -32,9 +32,11 @@ import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
-import org.apache.iotdb.consensus.statemachine.IStateMachine.Registry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -43,20 +45,19 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
/**
* A simple consensus implementation, which can be used when replicaNum is 1.
*
* <p>Notice: The stateMachine needs to implement WAL itself to ensure recovery after a restart
- *
- * <p>any module can use `IConsensus consensusImpl = new StandAloneConsensus(id -> new
- * EmptyStateMachine());` to perform an initialization implementation.
*/
class StandAloneConsensus implements IConsensus {
+ private final Logger logger = LoggerFactory.getLogger(StandAloneConsensus.class);
+
private final TEndPoint thisNode;
private final File storageDir;
private final IStateMachine.Registry registry;
@@ -71,16 +72,21 @@ class StandAloneConsensus implements IConsensus {
@Override
public void start() throws IOException {
- if (!this.storageDir.exists()) {
- storageDir.mkdirs();
+ initAndRecover();
+ }
+
+ private void initAndRecover() throws IOException {
+ if (!storageDir.exists()) {
+ if (!storageDir.mkdirs()) {
+ logger.warn("Unable to create consensus dir at {}", storageDir);
+ }
} else {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
for (Path path : stream) {
- String filename = path.getFileName().toString();
- String[] items = filename.split("_");
- TConsensusGroupType type = TConsensusGroupType.valueOf(items[0]);
- ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.createEmpty(type);
- consensusGroupId.setId(Integer.parseInt(items[1]));
+ String[] items = path.getFileName().toString().split("_");
+ ConsensusGroupId consensusGroupId =
+ ConsensusGroupId.Factory.create(
+ TConsensusGroupType.valueOf(items[0]).getValue(), Integer.parseInt(items[1]));
TEndPoint endPoint = new TEndPoint(items[2], Integer.parseInt(items[3]));
stateMachineMap.put(
consensusGroupId,
@@ -96,40 +102,24 @@ class StandAloneConsensus implements IConsensus {
@Override
public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) {
- AtomicReference<TSStatus> result = new AtomicReference<>();
- stateMachineMap.computeIfPresent(
- groupId,
- (k, v) -> {
- // TODO make Statemachine thread-safe to avoid thread-safe ways like this that may affect
- // performance
- result.set(v.write(request));
- return v;
- });
- if (result.get() == null) {
+ StandAloneServerImpl impl = stateMachineMap.get(groupId);
+ if (impl == null) {
return ConsensusWriteResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
.build();
}
- return ConsensusWriteResponse.newBuilder().setStatus(result.get()).build();
+ return ConsensusWriteResponse.newBuilder().setStatus(impl.write(request)).build();
}
@Override
public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request) {
- AtomicReference<DataSet> result = new AtomicReference<>();
- stateMachineMap.computeIfPresent(
- groupId,
- (k, v) -> {
- // TODO make Statemachine thread-safe to avoid thread-safe ways like this that may affect
- // performance
- result.set(v.read(request));
- return v;
- });
- if (result.get() == null) {
+ StandAloneServerImpl impl = stateMachineMap.get(groupId);
+ if (impl == null) {
return ConsensusReadResponse.newBuilder()
.setException(new ConsensusGroupNotExistException(groupId))
.build();
}
- return ConsensusReadResponse.newBuilder().setDataSet(result.get()).build();
+ return ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build();
}
@Override
@@ -140,6 +130,11 @@ class StandAloneConsensus implements IConsensus {
.setException(new IllegalPeerNumException(consensusGroupSize))
.build();
}
+ if (!Objects.equals(thisNode, peers.get(0).getEndpoint())) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new IllegalPeerEndpointException(thisNode, peers.get(0).getEndpoint()))
+ .build();
+ }
AtomicBoolean exist = new AtomicBoolean(true);
stateMachineMap.computeIfAbsent(
groupId,
@@ -148,19 +143,11 @@ class StandAloneConsensus implements IConsensus {
StandAloneServerImpl impl =
new StandAloneServerImpl(peers.get(0), registry.apply(groupId));
impl.start();
- String groupPath =
- storageDir
- + File.separator
- + groupId.getType()
- + "_"
- + groupId.getId()
- + "_"
- + peers.get(0).getEndpoint().ip
- + "_"
- + peers.get(0).getEndpoint().port;
- File file = new File(groupPath);
- file.mkdirs();
-
+ String path = buildPeerDir(groupId);
+ File file = new File(path);
+ if (!file.mkdirs()) {
+ logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
+ }
return impl;
});
if (exist.get()) {
@@ -177,20 +164,13 @@ class StandAloneConsensus implements IConsensus {
stateMachineMap.computeIfPresent(
groupId,
(k, v) -> {
- String groupPath =
- storageDir
- + File.separator
- + groupId.getType()
- + "_"
- + groupId.getId()
- + "_"
- + thisNode.ip
- + "_"
- + thisNode.port;
- File file = new File(groupPath);
- file.delete();
exist.set(true);
v.stop();
+ String path = buildPeerDir(groupId);
+ File file = new File(path);
+ if (!file.delete()) {
+ logger.warn("Unable to delete consensus dir for group {} at {}", groupId, path);
+ }
return null;
});
@@ -239,4 +219,16 @@ class StandAloneConsensus implements IConsensus {
}
return new Peer(groupId, thisNode);
}
+
+ private String buildPeerDir(ConsensusGroupId groupId) {
+ return storageDir
+ + File.separator
+ + groupId.getType()
+ + "_"
+ + groupId.getId()
+ + "_"
+ + thisNode.getIp()
+ + "_"
+ + thisNode.getPort();
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index be0dbaa6c7..4c55e9676b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -20,11 +20,11 @@
package org.apache.iotdb.consensus.standalone;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
import java.io.File;
import java.nio.ByteBuffer;
@@ -69,17 +69,21 @@ public class StandAloneServerImpl implements IStateMachine {
@Override
public boolean takeSnapshot(ByteBuffer metadata, File snapshotDir) {
- return false;
+ return stateMachine.takeSnapshot(metadata, snapshotDir);
}
@Override
public SnapshotMeta getLatestSnapshot(File snapshotDir) {
- return null;
+ return stateMachine.getLatestSnapshot(snapshotDir);
}
@Override
- public void loadSnapshot(SnapshotMeta latest) {}
+ public void loadSnapshot(SnapshotMeta latest) {
+ stateMachine.loadSnapshot(latest);
+ }
@Override
- public void cleanUpOldSnapshots(File snapshotDir) {}
+ public void cleanUpOldSnapshots(File snapshotDir) {
+ stateMachine.cleanUpOldSnapshots(snapshotDir);
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
similarity index 97%
rename from consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
rename to consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index 8693d1f6ce..118b057d1a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.statemachine;
+package org.apache.iotdb.consensus;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index fe209ba019..962dd84d95 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
import org.apache.ratis.util.FileUtils;
import org.slf4j.Logger;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
index 0c9f0e74f6..a166893b6b 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.EmptyStateMachine;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
-import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
@@ -48,7 +48,7 @@ public class RecoveryTest {
consensusImpl =
ConsensusFactory.getConsensusImpl(
STANDALONE_CONSENSUS_CLASS_NAME,
- new TEndPoint("localhost", 9000),
+ new TEndPoint("0.0.0.0", 9000),
new File("./target/recovery"),
gid -> new EmptyStateMachine())
.orElseThrow(
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 986e347356..1222598444 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.EmptyStateMachine;
import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.SnapshotMeta;
@@ -36,9 +38,8 @@ import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
-import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
@@ -134,7 +135,7 @@ public class StandAloneConsensusTest {
consensusImpl =
ConsensusFactory.getConsensusImpl(
STANDALONE_CONSENSUS_CLASS_NAME,
- new TEndPoint("localhost", 6667),
+ new TEndPoint("0.0.0.0", 6667),
new File("./target/standalone"),
gid -> {
switch (gid.getType()) {
@@ -186,11 +187,18 @@ public class StandAloneConsensusTest {
assertTrue(response3.getException() instanceof IllegalPeerNumException);
ConsensusGenericResponse response4 =
+ consensusImpl.addConsensusGroup(
+ dataRegionId,
+ Collections.singletonList(new Peer(dataRegionId, new TEndPoint("0.0.0.1", 6667))));
+ assertFalse(response4.isSuccess());
+ assertTrue(response4.getException() instanceof IllegalPeerEndpointException);
+
+ ConsensusGenericResponse response5 =
consensusImpl.addConsensusGroup(
schemaRegionId,
Collections.singletonList(new Peer(schemaRegionId, new TEndPoint("0.0.0.0", 6667))));
- assertTrue(response4.isSuccess());
- assertNull(response4.getException());
+ assertTrue(response5.isSuccess());
+ assertNull(response5.getException());
}
@Test
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index aee4bcee55..f5c64f2ad0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -22,53 +22,63 @@ package org.apache.iotdb.commons.consensus;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-public interface ConsensusGroupId {
+import java.util.Objects;
- // return specific id
- int getId();
+// we abstract this class to hide word `ConsensusGroup` for IoTDB StorageEngine/SchemaEngine
+public abstract class ConsensusGroupId {
- void setId(int id);
+ protected int id;
+
+ // return specific id
+ public int getId() {
+ return id;
+ }
// return specific type
- TConsensusGroupType getType();
+ public abstract TConsensusGroupType getType();
- class Factory {
- public static ConsensusGroupId createEmpty(TConsensusGroupType type) {
+ @Override
+ public int hashCode() {
+ return Objects.hash(getType(), getId());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ConsensusGroupId that = (ConsensusGroupId) o;
+ return getId() == that.getId() && getType() == that.getType();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s[%d]", getType(), getId());
+ }
+
+ public static class Factory {
+
+ public static ConsensusGroupId create(int type, int id) {
ConsensusGroupId groupId;
- switch (type) {
- case DataRegion:
- groupId = new DataRegionId();
- break;
- case SchemaRegion:
- groupId = new SchemaRegionId();
- break;
- case PartitionRegion:
- groupId = new PartitionRegionId();
- break;
- default:
- throw new IllegalArgumentException("unrecognized id type " + type);
+ if (type == TConsensusGroupType.DataRegion.getValue()) {
+ groupId = new DataRegionId(id);
+ } else if (type == TConsensusGroupType.SchemaRegion.getValue()) {
+ groupId = new SchemaRegionId(id);
+ } else if (type == TConsensusGroupType.PartitionRegion.getValue()) {
+ groupId = new PartitionRegionId(id);
+ } else {
+ throw new IllegalArgumentException(
+ "Unrecognized TConsensusGroupType: " + type + " with id = " + id);
}
return groupId;
}
- public static ConsensusGroupId convertFromTConsensusGroupId(
+ public static ConsensusGroupId createFromTConsensusGroupId(
TConsensusGroupId tConsensusGroupId) {
- ConsensusGroupId groupId = createEmpty(tConsensusGroupId.getType());
- groupId.setId(tConsensusGroupId.getId());
- return groupId;
- }
-
- public static TConsensusGroupId convertToTConsensusGroupId(ConsensusGroupId consensusGroupId) {
- TConsensusGroupId result = new TConsensusGroupId();
- if (consensusGroupId instanceof SchemaRegionId) {
- result.setType(TConsensusGroupType.SchemaRegion);
- } else if (consensusGroupId instanceof DataRegionId) {
- result.setType(TConsensusGroupType.DataRegion);
- } else if (consensusGroupId instanceof PartitionRegionId) {
- result.setType(TConsensusGroupType.PartitionRegion);
- }
- result.setId(consensusGroupId.getId());
- return result;
+ return create(tConsensusGroupId.getType().getValue(), tConsensusGroupId.getId());
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index 81fed3fe62..03ec5e4d7d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -21,51 +21,14 @@ package org.apache.iotdb.commons.consensus;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import java.util.Objects;
-
-public class DataRegionId implements ConsensusGroupId {
-
- private int id;
-
- public DataRegionId() {}
+public class DataRegionId extends ConsensusGroupId {
public DataRegionId(int id) {
this.id = id;
}
- @Override
- public int getId() {
- return id;
- }
-
- @Override
- public void setId(int id) {
- this.id = id;
- }
-
@Override
public TConsensusGroupType getType() {
return TConsensusGroupType.DataRegion;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- DataRegionId that = (DataRegionId) o;
- return id == that.id;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, TConsensusGroupType.DataRegion);
- }
-
- public String toString() {
- return String.format("%s[%d]", getType(), getId());
- }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
index 6f317547ed..82cb7a2fed 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
@@ -21,51 +21,14 @@ package org.apache.iotdb.commons.consensus;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import java.util.Objects;
-
-public class PartitionRegionId implements ConsensusGroupId {
-
- private int id;
-
- public PartitionRegionId() {}
+public class PartitionRegionId extends ConsensusGroupId {
public PartitionRegionId(int id) {
this.id = id;
}
- @Override
- public int getId() {
- return id;
- }
-
- @Override
- public void setId(int id) {
- this.id = id;
- }
-
@Override
public TConsensusGroupType getType() {
return TConsensusGroupType.PartitionRegion;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PartitionRegionId that = (PartitionRegionId) o;
- return id == that.id;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, TConsensusGroupType.PartitionRegion);
- }
-
- public String toString() {
- return String.format("%s[%d]", getType(), getId());
- }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
index 61c4ca80c5..48b9cbf820 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
@@ -21,51 +21,14 @@ package org.apache.iotdb.commons.consensus;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import java.util.Objects;
-
-public class SchemaRegionId implements ConsensusGroupId {
-
- private int id;
-
- public SchemaRegionId() {}
+public class SchemaRegionId extends ConsensusGroupId {
public SchemaRegionId(int id) {
this.id = id;
}
- @Override
- public int getId() {
- return id;
- }
-
- @Override
- public void setId(int id) {
- this.id = id;
- }
-
@Override
public TConsensusGroupType getType() {
return TConsensusGroupType.SchemaRegion;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SchemaRegionId that = (SchemaRegionId) o;
- return id == that.id;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, TConsensusGroupType.SchemaRegion);
- }
-
- public String toString() {
- return String.format("%s[%d]", getType(), getId());
- }
}
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
index af19aea35e..f00b6dbb9c 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
@@ -34,14 +34,14 @@ public class ConsensusGroupIdTest {
@Test
public void TestCreate() throws IOException {
ConsensusGroupId dataRegionId =
- ConsensusGroupId.Factory.convertFromTConsensusGroupId(
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(
new TConsensusGroupId(TConsensusGroupType.DataRegion, 1));
Assert.assertTrue(dataRegionId instanceof DataRegionId);
Assert.assertEquals(1, dataRegionId.getId());
Assert.assertEquals(TConsensusGroupType.DataRegion, dataRegionId.getType());
ConsensusGroupId schemaRegionId =
- ConsensusGroupId.Factory.convertFromTConsensusGroupId(
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(
new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 2));
Assert.assertTrue(schemaRegionId instanceof SchemaRegionId);
Assert.assertEquals(2, schemaRegionId.getId());
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 1fdaf2c7d0..2298d69272 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -20,10 +20,10 @@
package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.rpc.TSStatusCode;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index b4be544d91..7f0b3a9489 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -89,7 +89,7 @@ public class InternalServiceImpl implements InternalService.Iface {
public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
QueryType type = QueryType.valueOf(req.queryType);
ConsensusGroupId groupId =
- ConsensusGroupId.Factory.convertFromTConsensusGroupId(req.getConsensusGroupId());
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
switch (type) {
case READ:
ConsensusReadResponse readResp =
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 990c20f933..00bf02cb0b 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -85,7 +85,7 @@ public class InternalServiceImplTest {
TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
ConsensusImpl.getInstance()
.addConsensusGroup(
- ConsensusGroupId.Factory.convertFromTConsensusGroupId(regionReplicaSet.getRegionId()),
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()),
genPeerList(regionReplicaSet));
internalServiceImpl = new InternalServiceImpl();
}
@@ -95,7 +95,7 @@ public class InternalServiceImplTest {
TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
ConsensusImpl.getInstance()
.removeConsensusGroup(
- ConsensusGroupId.Factory.convertFromTConsensusGroupId(regionReplicaSet.getRegionId()));
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()));
FileUtils.deleteFully(new File(conf.getConsensusDir()));
}