You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/06/07 11:26:07 UTC

[iotdb] branch expr updated: add basic sliding window

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr by this push:
     new e214c8e  add basic sliding window
e214c8e is described below

commit e214c8e03624c9cee502351f6b51205ed2257bbe
Author: jt <jt...@163.com>
AuthorDate: Mon Jun 7 19:25:23 2021 +0800

    add basic sliding window
---
 .../org/apache/iotdb/cluster/expr/ExprMember.java  | 10 ++++++--
 .../org/apache/iotdb/cluster/expr/ExprServer.java  |  8 ++++--
 .../iotdb/cluster/log/IndirectLogDispatcher.java   | 30 +++++++++++++++-------
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  2 +-
 .../iotdb/cluster/server/member/RaftMember.java    |  9 ++++++-
 5 files changed, 44 insertions(+), 15 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index efa63c2..0ad1ebb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -42,6 +42,9 @@ import org.apache.thrift.protocol.TProtocolFactory;
 
 public class ExprMember extends MetaGroupMember {
 
+  public static boolean bypassRaft = false;
+  public static boolean useSlidingWindow = false;
+
   public ExprMember() {
   }
 
@@ -69,7 +72,7 @@ public class ExprMember extends MetaGroupMember {
 
   @Override
   public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
-    if (false) {
+    if (bypassRaft) {
       if (plan instanceof ExprPlan && !((ExprPlan) plan).isNeedForward()) {
         return StatusUtils.OK;
       } else if (plan instanceof ExprPlan) {
@@ -99,7 +102,10 @@ public class ExprMember extends MetaGroupMember {
     return processNonPartitionedMetaPlan(plan);
   }
 
-  protected long appendEntry1(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+  protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+    if (!useSlidingWindow) {
+      return super.appendEntry(prevLogIndex, prevLogTerm, leaderCommit, log);
+    }
     long resp;
     long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
     long success = 0;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
index 338755e..4ac5559 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.cluster.expr;
 
-import io.moquette.broker.config.IConfig;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -31,7 +30,6 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.tsfile.read.filter.operator.In;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TServerTransport;
@@ -74,6 +72,9 @@ public class ExprServer extends MetaClusterServer {
     String[] allNodeStr = args[1].split(",");
 
     int dispatcherThreadNum = Integer.parseInt(args[2]);
+    boolean useIndirectDispatcher = Boolean.parseBoolean(args[3]);
+    boolean bypassRaft = Boolean.parseBoolean(args[4]);
+    boolean useSW = Boolean.parseBoolean(args[5]);
 
     ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(Arrays.asList(allNodeStr));
     ClusterDescriptor.getInstance().getConfig().setInternalMetaPort(port);
@@ -81,7 +82,10 @@ public class ExprServer extends MetaClusterServer {
     ClusterDescriptor.getInstance().getConfig().setEnableRaftLogPersistence(false);
     ClusterDescriptor.getInstance().getConfig().setUseBatchInLogCatchUp(false);
     RaftMember.USE_LOG_DISPATCHER = true;
+    RaftMember.USE_INDIRECT_LOG_DISPATCHER = useIndirectDispatcher;
     LogDispatcher.bindingThreadNum = dispatcherThreadNum;
+    ExprMember.bypassRaft = bypassRaft;
+    ExprMember.useSlidingWindow = useSW;
 
     ExprServer server = new ExprServer();
     server.start();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 75f1b16..71aa631 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -20,18 +20,20 @@
 package org.apache.iotdb.cluster.log;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import org.apache.iotdb.cluster.log.LogDispatcher.DispatcherThread;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * IndirectLogDispatcher sends entries only to a pre-selected subset of followers instead of all
@@ -39,11 +41,11 @@ import org.apache.iotdb.cluster.utils.ClusterUtils;
  */
 public class IndirectLogDispatcher extends LogDispatcher {
 
-  private Map<Node, List<Node>> directToIndirectFollowerMap = new HashMap<>();
+  private static final Logger logger = LoggerFactory.getLogger(IndirectLogDispatcher.class);
+  private Map<Node, List<Node>> directToIndirectFollowerMap;
 
   public IndirectLogDispatcher(RaftMember member) {
     super(member);
-    recalculateDirectFollowerMap();
   }
 
   @Override
@@ -54,9 +56,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
 
   @Override
   void createQueueAndBindingThreads() {
-    for (Node node : directToIndirectFollowerMap.keySet()) {
-      nodeLogQueues.add(createQueueAndBindingThread(node));
-    }
+    recalculateDirectFollowerMap();
   }
 
   public void recalculateDirectFollowerMap() {
@@ -64,9 +64,17 @@ public class IndirectLogDispatcher extends LogDispatcher {
     allNodes.removeIf(n -> ClusterUtils.isNodeEquals(n, member.getThisNode()));
     QueryCoordinator instance = QueryCoordinator.getINSTANCE();
     List<Node> orderedNodes = instance.reorderNodes(allNodes);
-
     synchronized (this) {
-      directToIndirectFollowerMap.clear();
+      executorService.shutdown();
+      try {
+        executorService.awaitTermination(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        logger.warn("Dispatcher thread pool of {} cannot be shutdown within 10s", member);
+      }
+      executorService = Executors.newCachedThreadPool();
+
+      directToIndirectFollowerMap = new HashMap<>();
       for (int i = 0, j = orderedNodes.size() - 1; i <= j; i++, j--) {
         if (i != j) {
           directToIndirectFollowerMap.put(orderedNodes.get(i),
@@ -76,6 +84,10 @@ public class IndirectLogDispatcher extends LogDispatcher {
         }
       }
     }
+
+    for (Node node : directToIndirectFollowerMap.keySet()) {
+      nodeLogQueues.add(createQueueAndBindingThread(node));
+    }
   }
 
   class DispatcherThread extends LogDispatcher.DispatcherThread {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 711b511..ae50b7c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -68,7 +68,7 @@ public class LogDispatcher {
   private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
   private boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
   List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
-  private ExecutorService executorService;
+  ExecutorService executorService;
   private static ExecutorService serializationService =
       Executors.newFixedThreadPool(
           Runtime.getRuntime().availableProcessors(),
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index daa3f4e..5b248b8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.CommitLogCallback;
 import org.apache.iotdb.cluster.log.CommitLogTask;
 import org.apache.iotdb.cluster.log.HardState;
+import org.apache.iotdb.cluster.log.IndirectLogDispatcher;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogDispatcher;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
@@ -120,6 +121,7 @@ public abstract class RaftMember {
 
   private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
   public static boolean USE_LOG_DISPATCHER = false;
+  public static boolean USE_INDIRECT_LOG_DISPATCHER = false;
 
   private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
   private static final String MSG_FORWARD_ERROR =
@@ -591,6 +593,7 @@ public abstract class RaftMember {
 
   public long appendEntryIndirect(AppendEntryRequest request, List<Node> subFollowers) throws UnknownLogTypeException {
     long result = appendEntry(request);
+    request.entry.rewind();
     appendLogThreadPool.submit(() -> sendLogToSubFollowers(request, subFollowers));
     return result;
   }
@@ -1508,7 +1511,11 @@ public abstract class RaftMember {
 
   private synchronized LogDispatcher getLogDispatcher() {
     if (logDispatcher == null) {
-      logDispatcher = new LogDispatcher(this);
+      if (USE_INDIRECT_LOG_DISPATCHER) {
+        logDispatcher = new IndirectLogDispatcher(this);
+      } else {
+        logDispatcher = new LogDispatcher(this);
+      }
     }
     return logDispatcher;
   }