You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/10/10 02:27:22 UTC

[iotdb] branch expr_vgraft updated: move extensions to expr

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_vgraft by this push:
     new a715f4e701 move extensions to expr
a715f4e701 is described below

commit a715f4e70141c11322942b198a323e7fb59c9e75
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Oct 10 10:27:16 2022 +0800

    move extensions to expr
---
 .../logtypes => expr/craft}/FragmentedLog.java     |  2 +-
 .../craft}/FragmentedLogDispatcher.java            |  9 +--
 .../nbraft}/SlidingWindowLogAppender.java          |  4 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    | 85 +++++++++-------------
 .../org/apache/iotdb/cluster/log/LogParser.java    |  2 +-
 .../cluster/log/applier/AsyncDataLogApplier.java   |  2 +-
 .../iotdb/cluster/log/applier/DataLogApplier.java  |  2 +-
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |  2 +-
 .../cluster/server/member/DataGroupMember.java     |  2 +-
 .../iotdb/cluster/server/member/RaftMember.java    |  4 +-
 10 files changed, 48 insertions(+), 66 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLog.java
similarity index 99%
rename from cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLog.java
index 92cb82aa4f..6d0894c46d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLog.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.log.logtypes;
+package org.apache.iotdb.cluster.expr.craft;
 
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.utils.encode.rs.ReedSolomon;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java
similarity index 93%
rename from cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java
index 96d3713fab..a2075e45f7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/craft/FragmentedLogDispatcher.java
@@ -17,9 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.log;
+package org.apache.iotdb.cluster.expr.craft;
 
-import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.log.LogDispatcher;
+import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Timer;
@@ -61,8 +62,6 @@ public class FragmentedLogDispatcher extends LogDispatcher {
             "Log queue[{}] of {} is full, ignore the request to this node",
             entry.left,
             member.getName());
-      } else {
-        request.setEnqueueTime(System.nanoTime());
       }
     }
     Statistic.LOG_DISPATCHER_LOG_ENQUEUE.calOperationCostTimeFromStart(startTime);
@@ -80,7 +79,7 @@ public class FragmentedLogDispatcher extends LogDispatcher {
 
   class DispatcherThread extends LogDispatcher.DispatcherThread {
 
-    DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
+    protected DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
       super(receiver, logBlockingDeque);
     }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/nbraft/SlidingWindowLogAppender.java
similarity index 98%
rename from cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/expr/nbraft/SlidingWindowLogAppender.java
index 9303580d31..8fb320107c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/nbraft/SlidingWindowLogAppender.java
@@ -17,10 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.log.appender;
+package org.apache.iotdb.cluster.expr.nbraft;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.appender.LogAppender;
+import org.apache.iotdb.cluster.log.appender.LogAppenderFactory;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 65f421f651..3d53d51569 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -19,6 +19,22 @@
 
 package org.apache.iotdb.cluster.log;
 
+import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.LOG_DISPATCHER_LOG_ENQUEUE_SINGLE;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -32,7 +48,6 @@ import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.NodeStatus;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.server.monitor.PeerInfo;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClientUtils;
@@ -42,28 +57,11 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.tsfile.utils.Pair;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.LOG_DISPATCHER_LOG_ENQUEUE_SINGLE;
-
 /**
  * A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
  * followers and send the logs in an ordered manner so that the followers will not wait for previous
@@ -74,13 +72,12 @@ import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.LOG_DISPAT
 public class LogDispatcher {
 
   private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
-  RaftMember member;
+  protected RaftMember member;
   private static final ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
   protected boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
-  Map<Node, BlockingQueue<SendLogRequest>> nodesLogQueueMap = new HashMap<>();
-  List<Pair<Node, BlockingQueue<SendLogRequest>>> nodesLogQueuesList = new ArrayList<>();
-  Map<Node, Boolean> nodesEnabled;
-  Map<Node, ExecutorService> executorServices = new HashMap<>();
+  protected List<Pair<Node, BlockingQueue<SendLogRequest>>> nodesLogQueuesList = new ArrayList<>();
+  protected Map<Node, Boolean> nodesEnabled;
+  protected Map<Node, ExecutorService> executorServices = new HashMap<>();
   protected boolean queueOrdered =
       !(clusterConfig.isUseFollowerSlidingWindow() && clusterConfig.isEnableWeakAcceptance());
 
@@ -120,15 +117,17 @@ public class LogDispatcher {
   @TestOnly
   public void close() throws InterruptedException {
     for (Entry<Node, ExecutorService> entry : executorServices.entrySet()) {
-      ExecutorService value = entry.getValue();
-      value.shutdownNow();
-      value.awaitTermination(10, TimeUnit.SECONDS);
+      ExecutorService pool = entry.getValue();
+      pool.shutdownNow();
+      boolean closeSucceeded = pool.awaitTermination(10, TimeUnit.SECONDS);
+      if (!closeSucceeded) {
+        logger.warn("Cannot shut down dispatcher pool of {}-{}", member.getName(), entry.getKey());
+      }
     }
   }
 
   protected SendLogRequest transformRequest(Node node, SendLogRequest request) {
-    SendLogRequest newRequest = new SendLogRequest(request);
-    return newRequest;
+    return new SendLogRequest(request);
   }
 
   protected boolean addToQueue(BlockingQueue<SendLogRequest> nodeLogQueue, SendLogRequest request) {
@@ -162,7 +161,8 @@ public class LogDispatcher {
 
     long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
     request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
-    List<Node> verifiers = null;
+
+    List<Node> verifiers = Collections.emptyList();
     if (clusterConfig.isUseVGRaft()) {
       verifiers = member.getTrustValueHolder().chooseVerifiers();
     }
@@ -186,8 +186,6 @@ public class LogDispatcher {
               "Log queue[{}] of {} is full, ignore the request to this node",
               entry.left,
               member.getName());
-        } else {
-          request.setEnqueueTime(System.nanoTime());
         }
       } catch (IllegalStateException e) {
         logger.debug(
@@ -212,7 +210,6 @@ public class LogDispatcher {
 
     private VotingLog votingLog;
     private AppendEntryRequest appendEntryRequest;
-    private long enqueueTime;
     private Future<ByteBuffer> serializedLogFuture;
     private int quorumSize;
     private boolean isVerifier;
@@ -227,7 +224,6 @@ public class LogDispatcher {
       this.setVotingLog(request.votingLog);
       this.setAppendEntryRequest(request.appendEntryRequest);
       this.setQuorumSize(request.quorumSize);
-      this.setEnqueueTime(request.enqueueTime);
       this.serializedLogFuture = request.serializedLogFuture;
     }
 
@@ -239,14 +235,6 @@ public class LogDispatcher {
       this.votingLog = votingLog;
     }
 
-    public long getEnqueueTime() {
-      return enqueueTime;
-    }
-
-    public void setEnqueueTime(long enqueueTime) {
-      this.enqueueTime = enqueueTime;
-    }
-
     public AppendEntryRequest getAppendEntryRequest() {
       return appendEntryRequest;
     }
@@ -268,29 +256,22 @@ public class LogDispatcher {
       return "SendLogRequest{" + "log=" + votingLog + '}';
     }
 
-    public boolean isVerifier() {
-      return isVerifier;
-    }
-
     public void setVerifier(boolean verifier) {
       isVerifier = verifier;
     }
   }
 
-  class DispatcherThread implements Runnable {
+  protected class DispatcherThread implements Runnable {
 
     Node receiver;
-    private BlockingQueue<SendLogRequest> logBlockingDeque;
+    private final BlockingQueue<SendLogRequest> logBlockingDeque;
     protected List<SendLogRequest> currBatch = new ArrayList<>();
-    private PeerInfo peerInfo;
     private Client syncClient;
-    AsyncClient asyncClient;
-    private String baseName;
+    private final String baseName;
 
-    DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
+    protected DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
       this.receiver = receiver;
       this.logBlockingDeque = logBlockingDeque;
-      this.peerInfo = member.getPeer(receiver);
       baseName = "LogDispatcher-" + member.getName() + "-" + receiver;
     }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index 152cea5d10..e5f33a0174 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.cluster.log.Log.Types;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
-import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.LargeTestLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.RequestLog;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index 97a621de04..a3ca651515 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index ff04103529..23784929d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
-import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index fa4c70a7e7..488f63c884 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
-import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.server.NodeCharacter;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 16db321abe..7f748677dd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.Snapshot;
 import org.apache.iotdb.cluster.log.appender.BlockingLogAppender;
-import org.apache.iotdb.cluster.log.appender.SlidingWindowLogAppender;
+import org.apache.iotdb.cluster.expr.nbraft.SlidingWindowLogAppender;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.manage.FilePartitionedSnapshotLogManager;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 7985189b81..267c0ea899 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -35,7 +35,7 @@ import org.apache.iotdb.cluster.expr.vgraft.KeyManager;
 import org.apache.iotdb.cluster.expr.vgraft.TrustValueHolder;
 import org.apache.iotdb.cluster.log.CommitLogCallback;
 import org.apache.iotdb.cluster.log.CommitLogTask;
-import org.apache.iotdb.cluster.log.FragmentedLogDispatcher;
+import org.apache.iotdb.cluster.expr.craft.FragmentedLogDispatcher;
 import org.apache.iotdb.cluster.log.HardState;
 import org.apache.iotdb.cluster.log.IndirectLogDispatcher;
 import org.apache.iotdb.cluster.log.Log;
@@ -50,7 +50,7 @@ import org.apache.iotdb.cluster.log.appender.BlockingLogAppender;
 import org.apache.iotdb.cluster.log.appender.LogAppender;
 import org.apache.iotdb.cluster.log.appender.LogAppenderFactory;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
-import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
+import org.apache.iotdb.cluster.expr.craft.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer.Factory;