You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/10/10 02:27:22 UTC
[iotdb] branch expr_vgraft updated: move extensions to expr
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_vgraft by this push:
new a715f4e701 move extensions to expr
a715f4e701 is described below
commit a715f4e70141c11322942b198a323e7fb59c9e75
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Oct 10 10:27:16 2022 +0800
move extensions to expr
---
.../logtypes => expr/craft}/FragmentedLog.java | 2 +-
.../craft}/FragmentedLogDispatcher.java | 9 +--
.../nbraft}/SlidingWindowLogAppender.java | 4 +-
.../apache/iotdb/cluster/log/LogDispatcher.java | 85 +++++++++-------------
.../org/apache/iotdb/cluster/log/LogParser.java | 2 +-
.../cluster/log/applier/AsyncDataLogApplier.java | 2 +-
.../iotdb/cluster/log/applier/DataLogApplier.java | 2 +-
.../iotdb/cluster/log/applier/MetaLogApplier.java | 2 +-
.../cluster/server/member/DataGroupMember.java | 2 +-
.../iotdb/cluster/server/member/RaftMember.java | 4 +-
10 files changed, 48 insertions(+), 66 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLog.java
similarity index 99%
rename from cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLog.java
index 92cb82aa4f..6d0894c46d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLog.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.cluster.log.logtypes;
+package org.apache.iotdb.cluster.expr.craft;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.utils.encode.rs.ReedSolomon;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java
similarity index 93%
rename from cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java
index 96d3713fab..a2075e45f7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.cluster.log;
+package org.apache.iotdb.cluster.expr.craft;
-import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.log.LogDispatcher;
+import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Timer;
@@ -61,8 +62,6 @@ public class FragmentedLogDispatcher extends LogDispatcher {
"Log queue[{}] of {} is full, ignore the request to this node",
entry.left,
member.getName());
- } else {
- request.setEnqueueTime(System.nanoTime());
}
}
Statistic.LOG_DISPATCHER_LOG_ENQUEUE.calOperationCostTimeFromStart(startTime);
@@ -80,7 +79,7 @@ public class FragmentedLogDispatcher extends LogDispatcher {
class DispatcherThread extends LogDispatcher.DispatcherThread {
- DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
+ protected DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
super(receiver, logBlockingDeque);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/nbraft/SlidingWindowLogAppender.java
similarity index 98%
rename from cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/expr/nbraft/SlidingWindowLogAppender.java
index 9303580d31..8fb320107c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/nbraft/SlidingWindowLogAppender.java
@@ -17,10 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.cluster.log.appender;
+package org.apache.iotdb.cluster.expr.nbraft;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.appender.LogAppender;
+import org.apache.iotdb.cluster.log.appender.LogAppenderFactory;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 65f421f651..3d53d51569 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -19,6 +19,22 @@
package org.apache.iotdb.cluster.log;
+import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.LOG_DISPATCHER_LOG_ENQUEUE_SINGLE;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -32,7 +48,6 @@ import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.NodeStatus;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.server.monitor.PeerInfo;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClientUtils;
@@ -42,28 +57,11 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.tsfile.utils.Pair;
-
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.LOG_DISPATCHER_LOG_ENQUEUE_SINGLE;
-
/**
* A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
* followers and send the logs in an ordered manner so that the followers will not wait for previous
@@ -74,13 +72,12 @@ import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.LOG_DISPAT
public class LogDispatcher {
private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
- RaftMember member;
+ protected RaftMember member;
private static final ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
protected boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
- Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueueMap = new HashMap<>();
- List<Pair<Node, BlockingQueue<SendLogRequest>>> nodesLogQueuesList = new ArrayList<>();
- Map<Node, Boolean> nodesEnabled;
- Map<Node, ExecutorService> executorServices = new HashMap<>();
+ protected List<Pair<Node, BlockingQueue<SendLogRequest>>> nodesLogQueuesList = new ArrayList<>();
+ protected Map<Node, Boolean> nodesEnabled;
+ protected Map<Node, ExecutorService> executorServices = new HashMap<>();
protected boolean queueOrdered =
!(clusterConfig.isUseFollowerSlidingWindow() && clusterConfig.isEnableWeakAcceptance());
@@ -120,15 +117,17 @@ public class LogDispatcher {
@TestOnly
public void close() throws InterruptedException {
for (Entry<Node, ExecutorService> entry : executorServices.entrySet()) {
- ExecutorService value = entry.getValue();
- value.shutdownNow();
- value.awaitTermination(10, TimeUnit.SECONDS);
+ ExecutorService pool = entry.getValue();
+ pool.shutdownNow();
+ boolean closeSucceeded = pool.awaitTermination(10, TimeUnit.SECONDS);
+ if (!closeSucceeded) {
+ logger.warn("Cannot shut down dispatcher pool of {}-{}", member.getName(), entry.getKey());
+ }
}
}
protected SendLogRequest transformRequest(Node node, SendLogRequest request) {
- SendLogRequest newRequest = new SendLogRequest(request);
- return newRequest;
+ return new SendLogRequest(request);
}
protected boolean addToQueue(BlockingQueue<SendLogRequest> nodeLogQueue, SendLogRequest request) {
@@ -162,7 +161,8 @@ public class LogDispatcher {
long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
- List<Node> verifiers = null;
+
+ List<Node> verifiers = Collections.emptyList();
if (clusterConfig.isUseVGRaft()) {
verifiers = member.getTrustValueHolder().chooseVerifiers();
}
@@ -186,8 +186,6 @@ public class LogDispatcher {
"Log queue[{}] of {} is full, ignore the request to this node",
entry.left,
member.getName());
- } else {
- request.setEnqueueTime(System.nanoTime());
}
} catch (IllegalStateException e) {
logger.debug(
@@ -212,7 +210,6 @@ public class LogDispatcher {
private VotingLog votingLog;
private AppendEntryRequest appendEntryRequest;
- private long enqueueTime;
private Future<ByteBuffer> serializedLogFuture;
private int quorumSize;
private boolean isVerifier;
@@ -227,7 +224,6 @@ public class LogDispatcher {
this.setVotingLog(request.votingLog);
this.setAppendEntryRequest(request.appendEntryRequest);
this.setQuorumSize(request.quorumSize);
- this.setEnqueueTime(request.enqueueTime);
this.serializedLogFuture = request.serializedLogFuture;
}
@@ -239,14 +235,6 @@ public class LogDispatcher {
this.votingLog = votingLog;
}
- public long getEnqueueTime() {
- return enqueueTime;
- }
-
- public void setEnqueueTime(long enqueueTime) {
- this.enqueueTime = enqueueTime;
- }
-
public AppendEntryRequest getAppendEntryRequest() {
return appendEntryRequest;
}
@@ -268,29 +256,22 @@ public class LogDispatcher {
return "SendLogRequest{" + "log=" + votingLog + '}';
}
- public boolean isVerifier() {
- return isVerifier;
- }
-
public void setVerifier(boolean verifier) {
isVerifier = verifier;
}
}
- class DispatcherThread implements Runnable {
+ protected class DispatcherThread implements Runnable {
Node receiver;
- private BlockingQueue<SendLogRequest> logBlockingDeque;
+ private final BlockingQueue<SendLogRequest> logBlockingDeque;
protected List<SendLogRequest> currBatch = new ArrayList<>();
- private PeerInfo peerInfo;
private Client syncClient;
- AsyncClient asyncClient;
- private String baseName;
+ private final String baseName;
- DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
+ protected DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
this.receiver = receiver;
this.logBlockingDeque = logBlockingDeque;
- this.peerInfo = member.getPeer(receiver);
baseName = "LogDispatcher-" + member.getName() + "-" + receiver;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index 152cea5d10..e5f33a0174 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.cluster.log.Log.Types;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
-import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
import org.apache.iotdb.cluster.log.logtypes.LargeTestLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.logtypes.RequestLog;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index 97a621de04..a3ca651515 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index ff04103529..23784929d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.ClusterIoTDB;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index fa4c70a7e7..488f63c884 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.exception.ChangeMembershipException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
-import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.server.NodeCharacter;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 16db321abe..7f748677dd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.log.Snapshot;
import org.apache.iotdb.cluster.log.appender.BlockingLogAppender;
-import org.apache.iotdb.cluster.log.appender.SlidingWindowLogAppender;
+import org.apache.iotdb.cluster.expr.nbraft.SlidingWindowLogAppender;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.manage.FilePartitionedSnapshotLogManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 7985189b81..267c0ea899 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -35,7 +35,7 @@ import org.apache.iotdb.cluster.expr.vgraft.KeyManager;
import org.apache.iotdb.cluster.expr.vgraft.TrustValueHolder;
import org.apache.iotdb.cluster.log.CommitLogCallback;
import org.apache.iotdb.cluster.log.CommitLogTask;
-import org.apache.iotdb.cluster.log.FragmentedLogDispatcher;
+import org.apache.iotdb.cluster.expr.craft.FragmentedLogDispatcher;
import org.apache.iotdb.cluster.log.HardState;
import org.apache.iotdb.cluster.log.IndirectLogDispatcher;
import org.apache.iotdb.cluster.log.Log;
@@ -50,7 +50,7 @@ import org.apache.iotdb.cluster.log.appender.BlockingLogAppender;
import org.apache.iotdb.cluster.log.appender.LogAppender;
import org.apache.iotdb.cluster.log.appender.LogAppenderFactory;
import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
-import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer.Factory;