You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/03/11 04:33:18 UTC

[iotdb] branch expr_plus updated: remove old interface

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_plus by this push:
     new 87cc4b6  remove old interface
87cc4b6 is described below

commit 87cc4b641ccfd15e0575a64e8ef39296238cbdc0
Author: jt <jt...@163.com>
AuthorDate: Fri Mar 11 12:32:31 2022 +0800

    remove old interface
---
 .../iotdb/cluster/log/IndirectLogDispatcher.java   |  7 ++--
 .../org/apache/iotdb/cluster/log/LogRelay.java     | 11 ++++--
 .../log/manage/UnCommittedEntryManager.java        | 13 +++++++
 .../handlers/forwarder/IndirectAppendHandler.java  |  5 +--
 .../iotdb/cluster/server/member/RaftMember.java    | 45 ++++++++++++++++++----
 .../cluster/server/service/BaseAsyncService.java   |  2 -
 .../cluster/server/service/BaseSyncService.java    |  2 -
 .../server/service/DataGroupServiceImpls.java      | 19 ---------
 .../cluster/server/service/MetaAsyncService.java   |  2 -
 .../cluster/server/service/MetaSyncService.java    |  2 -
 10 files changed, 64 insertions(+), 44 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 9c878df..343cb3d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -19,17 +19,16 @@
 
 package org.apache.iotdb.cluster.log;
 
-import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -107,8 +106,8 @@ public class IndirectLogDispatcher extends LogDispatcher {
     }
 
     @Override
-    protected AppendEntriesRequest prepareRequest(List<ByteBuffer> logList,
-        List<SendLogRequest> currBatch, int firstIndex) {
+    protected AppendEntriesRequest prepareRequest(
+        List<ByteBuffer> logList, List<SendLogRequest> currBatch, int firstIndex) {
       return super.prepareRequest(logList, currBatch, firstIndex)
           .setSubReceivers(directToIndirectFollowerMap.get(receiver));
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
index 91339c0..6964558 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
@@ -102,7 +102,12 @@ public class LogRelay {
           }
         }
 
-        raftMember.sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
+        if (relayEntry.singleRequest != null) {
+          raftMember.sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
+        } else if (relayEntry.batchRequest != null) {
+          raftMember.sendLogsToSubFollowers(relayEntry.batchRequest, relayEntry.receivers);
+        }
+
         Statistic.RAFT_SEND_RELAY.add(1);
       }
     }
@@ -142,8 +147,8 @@ public class LogRelay {
         return false;
       }
       RelayEntry that = (RelayEntry) o;
-      return Objects.equals(singleRequest, that.singleRequest) && Objects.equals(batchRequest,
-          that.batchRequest);
+      return Objects.equals(singleRequest, that.singleRequest)
+          && Objects.equals(batchRequest, that.batchRequest);
     }
 
     @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
index 1c63bff..39c32d2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
@@ -121,6 +121,19 @@ public class UnCommittedEntryManager {
         }
       } catch (IndexOutOfBoundsException e) {
         // continue
+        logger.info(
+            "Invalid entryPos {}, entries size {}, last index {}, requested index {}, "
+                + "offset {}",
+            entryPos,
+            entries.size(),
+            last,
+            index,
+            offset);
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException interruptedException) {
+          Thread.currentThread().interrupt();
+        }
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
index 42b750b..df06069 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.cluster.server.handlers.forwarder;
 
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 
@@ -31,9 +30,9 @@ public class IndirectAppendHandler implements AsyncMethodCallback<AppendEntryRes
 
   private static final Logger logger = LoggerFactory.getLogger(IndirectAppendHandler.class);
   private Node receiver;
-  private AppendEntryRequest request;
+  private Object request;
 
-  public IndirectAppendHandler(Node receiver, AppendEntryRequest request) {
+  public IndirectAppendHandler(Node receiver, Object request) {
     this.receiver = receiver;
     this.request = request;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0c227c5..4df17e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.cluster.server.member;
 
-import java.nio.Buffer;
 import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.client.ClientCategory;
 import org.apache.iotdb.cluster.client.ClientManager;
@@ -111,12 +110,12 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.SocketTimeoutException;
+import java.nio.Buffer;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.Map;
@@ -586,7 +585,8 @@ public abstract class RaftMember implements RaftMemberMBean {
     return result;
   }
 
-  private AppendEntryResult appendEntryInternal(AppendEntryRequest request) throws UnknownLogTypeException {
+  private AppendEntryResult appendEntryInternal(AppendEntryRequest request)
+      throws UnknownLogTypeException {
     logger.debug("{} received an AppendEntryRequest: {}", name, request);
     // the term checked here is that of the leader, not that of the log
     long checkResult = checkRequestTerm(request.term, request.leader);
@@ -650,6 +650,7 @@ public abstract class RaftMember implements RaftMemberMBean {
 
   public void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
     request.setIsFromLeader(false);
+    request.setSubReceiversIsSet(false);
     for (Node subFollower : subFollowers) {
       Client syncClient = null;
       try {
@@ -670,6 +671,29 @@ public abstract class RaftMember implements RaftMemberMBean {
     }
   }
 
+  public void sendLogsToSubFollowers(AppendEntriesRequest request, List<Node> subFollowers) {
+    request.setIsFromLeader(false);
+    request.setSubReceiversIsSet(false);
+    for (Node subFollower : subFollowers) {
+      Client syncClient = null;
+      try {
+        if (config.isUseAsyncServer()) {
+          getAsyncClient(subFollower)
+              .appendEntries(request, new IndirectAppendHandler(subFollower, request));
+        } else {
+          syncClient = getSyncClient(subFollower);
+          syncClient.appendEntries(request);
+        }
+      } catch (TException e) {
+        logger.error("Cannot send {} to {}", request, subFollower, e);
+      } finally {
+        if (syncClient != null) {
+          ClientUtils.putBackSyncClient(syncClient);
+        }
+      }
+    }
+  }
+
   /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
   public AppendEntryResult appendEntries(AppendEntriesRequest request)
       throws UnknownLogTypeException {
@@ -722,6 +746,15 @@ public abstract class RaftMember implements RaftMemberMBean {
           request.getEntries().size(),
           response);
     }
+
+    if (!request.isFromLeader) {
+      // TODO: use batch ack
+      for (Log log : logs) {
+        appendAckLeader(request.leader, log, response.status);
+      }
+      Statistic.RAFT_SEND_RELAY_ACK.add(1);
+    }
+
     return response;
   }
 
@@ -2144,11 +2177,9 @@ public abstract class RaftMember implements RaftMemberMBean {
     }
 
     if (config.isUseAsyncServer()) {
-      sendLogAsync(
-          log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
+      sendLogAsync(log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
     } else {
-      sendLogSync(
-          log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
+      sendLogSync(log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index a521bb3..d8cece7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
@@ -47,7 +46,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
-import java.util.List;
 
 public abstract class BaseAsyncService implements RaftService.AsyncIface {
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index a8f2704..24caee3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
@@ -47,7 +46,6 @@ import java.io.IOException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
-import java.util.List;
 
 public abstract class BaseSyncService implements RaftService.Iface {
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index 6c9dc83..3ead63f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -768,30 +768,11 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
   }
 
   @Override
-  public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
-      throws TException {
-    return DataGroupEngine.getInstance()
-        .getDataSyncService(request.getHeader())
-        .appendEntryIndirect(request, subReceivers);
-  }
-
-  @Override
   public void acknowledgeAppendEntry(AppendEntryResult ack) throws TException {
     DataGroupEngine.getInstance().getDataSyncService(ack.getHeader()).acknowledgeAppendEntry(ack);
   }
 
   @Override
-  public void appendEntryIndirect(
-      AppendEntryRequest request,
-      List<Node> subReceivers,
-      AsyncMethodCallback<AppendEntryResult> resultHandler)
-      throws TException {
-    DataGroupEngine.getInstance()
-        .getDataAsyncService(request.getHeader(), resultHandler, request)
-        .appendEntryIndirect(request, subReceivers, resultHandler);
-  }
-
-  @Override
   public void acknowledgeAppendEntry(AppendEntryResult ack, AsyncMethodCallback<Void> resultHandler)
       throws TException {
     DataGroupEngine.getInstance()
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 32cba96..1db484e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -47,7 +46,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.List;
 
 public class MetaAsyncService extends BaseAsyncService implements TSMetaService.AsyncIface {
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index aae1cb7..7a6eb2c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -48,7 +47,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.List;
 
 public class MetaSyncService extends BaseSyncService implements TSMetaService.Iface {