You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/03/11 04:33:18 UTC
[iotdb] branch expr_plus updated: remove old interface
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_plus by this push:
new 87cc4b6 remove old interface
87cc4b6 is described below
commit 87cc4b641ccfd15e0575a64e8ef39296238cbdc0
Author: jt <jt...@163.com>
AuthorDate: Fri Mar 11 12:32:31 2022 +0800
remove old interface
---
.../iotdb/cluster/log/IndirectLogDispatcher.java | 7 ++--
.../org/apache/iotdb/cluster/log/LogRelay.java | 11 ++++--
.../log/manage/UnCommittedEntryManager.java | 13 +++++++
.../handlers/forwarder/IndirectAppendHandler.java | 5 +--
.../iotdb/cluster/server/member/RaftMember.java | 45 ++++++++++++++++++----
.../cluster/server/service/BaseAsyncService.java | 2 -
.../cluster/server/service/BaseSyncService.java | 2 -
.../server/service/DataGroupServiceImpls.java | 19 ---------
.../cluster/server/service/MetaAsyncService.java | 2 -
.../cluster/server/service/MetaSyncService.java | 2 -
10 files changed, 64 insertions(+), 44 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 9c878df..343cb3d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -19,17 +19,16 @@
package org.apache.iotdb.cluster.log;
-import java.nio.ByteBuffer;
import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -107,8 +106,8 @@ public class IndirectLogDispatcher extends LogDispatcher {
}
@Override
- protected AppendEntriesRequest prepareRequest(List<ByteBuffer> logList,
- List<SendLogRequest> currBatch, int firstIndex) {
+ protected AppendEntriesRequest prepareRequest(
+ List<ByteBuffer> logList, List<SendLogRequest> currBatch, int firstIndex) {
return super.prepareRequest(logList, currBatch, firstIndex)
.setSubReceivers(directToIndirectFollowerMap.get(receiver));
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
index 91339c0..6964558 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
@@ -102,7 +102,12 @@ public class LogRelay {
}
}
- raftMember.sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
+ if (relayEntry.singleRequest != null) {
+ raftMember.sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
+ } else if (relayEntry.batchRequest != null) {
+ raftMember.sendLogsToSubFollowers(relayEntry.batchRequest, relayEntry.receivers);
+ }
+
Statistic.RAFT_SEND_RELAY.add(1);
}
}
@@ -142,8 +147,8 @@ public class LogRelay {
return false;
}
RelayEntry that = (RelayEntry) o;
- return Objects.equals(singleRequest, that.singleRequest) && Objects.equals(batchRequest,
- that.batchRequest);
+ return Objects.equals(singleRequest, that.singleRequest)
+ && Objects.equals(batchRequest, that.batchRequest);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
index 1c63bff..39c32d2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
@@ -121,6 +121,19 @@ public class UnCommittedEntryManager {
}
} catch (IndexOutOfBoundsException e) {
// continue
+ logger.info(
+ "Invalid entryPos {}, entries size {}, last index {}, requested index {}, "
+ + "offset {}",
+ entryPos,
+ entries.size(),
+ last,
+ index,
+ offset);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException interruptedException) {
+ Thread.currentThread().interrupt();
+ }
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
index 42b750b..df06069 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.server.handlers.forwarder;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -31,9 +30,9 @@ public class IndirectAppendHandler implements AsyncMethodCallback<AppendEntryRes
private static final Logger logger = LoggerFactory.getLogger(IndirectAppendHandler.class);
private Node receiver;
- private AppendEntryRequest request;
+ private Object request;
- public IndirectAppendHandler(Node receiver, AppendEntryRequest request) {
+ public IndirectAppendHandler(Node receiver, Object request) {
this.receiver = receiver;
this.request = request;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0c227c5..4df17e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.server.member;
-import java.nio.Buffer;
import org.apache.iotdb.cluster.ClusterIoTDB;
import org.apache.iotdb.cluster.client.ClientCategory;
import org.apache.iotdb.cluster.client.ClientManager;
@@ -111,12 +110,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketTimeoutException;
+import java.nio.Buffer;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
@@ -586,7 +585,8 @@ public abstract class RaftMember implements RaftMemberMBean {
return result;
}
- private AppendEntryResult appendEntryInternal(AppendEntryRequest request) throws UnknownLogTypeException {
+ private AppendEntryResult appendEntryInternal(AppendEntryRequest request)
+ throws UnknownLogTypeException {
logger.debug("{} received an AppendEntryRequest: {}", name, request);
// the term checked here is that of the leader, not that of the log
long checkResult = checkRequestTerm(request.term, request.leader);
@@ -650,6 +650,7 @@ public abstract class RaftMember implements RaftMemberMBean {
public void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
request.setIsFromLeader(false);
+ request.setSubReceiversIsSet(false);
for (Node subFollower : subFollowers) {
Client syncClient = null;
try {
@@ -670,6 +671,29 @@ public abstract class RaftMember implements RaftMemberMBean {
}
}
+ public void sendLogsToSubFollowers(AppendEntriesRequest request, List<Node> subFollowers) {
+ request.setIsFromLeader(false);
+ request.setSubReceiversIsSet(false);
+ for (Node subFollower : subFollowers) {
+ Client syncClient = null;
+ try {
+ if (config.isUseAsyncServer()) {
+ getAsyncClient(subFollower)
+ .appendEntries(request, new IndirectAppendHandler(subFollower, request));
+ } else {
+ syncClient = getSyncClient(subFollower);
+ syncClient.appendEntries(request);
+ }
+ } catch (TException e) {
+ logger.error("Cannot send {} to {}", request, subFollower, e);
+ } finally {
+ if (syncClient != null) {
+ ClientUtils.putBackSyncClient(syncClient);
+ }
+ }
+ }
+ }
+
/** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
public AppendEntryResult appendEntries(AppendEntriesRequest request)
throws UnknownLogTypeException {
@@ -722,6 +746,15 @@ public abstract class RaftMember implements RaftMemberMBean {
request.getEntries().size(),
response);
}
+
+ if (!request.isFromLeader) {
+ // TODO: use batch ack
+ for (Log log : logs) {
+ appendAckLeader(request.leader, log, response.status);
+ }
+ Statistic.RAFT_SEND_RELAY_ACK.add(1);
+ }
+
return response;
}
@@ -2144,11 +2177,9 @@ public abstract class RaftMember implements RaftMemberMBean {
}
if (config.isUseAsyncServer()) {
- sendLogAsync(
- log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
+ sendLogAsync(log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
} else {
- sendLogSync(
- log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
+ sendLogSync(log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index a521bb3..d8cece7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
@@ -47,7 +46,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
-import java.util.List;
public abstract class BaseAsyncService implements RaftService.AsyncIface {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index a8f2704..24caee3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
@@ -47,7 +46,6 @@ import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
-import java.util.List;
public abstract class BaseSyncService implements RaftService.Iface {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index 6c9dc83..3ead63f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -768,30 +768,11 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
}
@Override
- public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
- throws TException {
- return DataGroupEngine.getInstance()
- .getDataSyncService(request.getHeader())
- .appendEntryIndirect(request, subReceivers);
- }
-
- @Override
public void acknowledgeAppendEntry(AppendEntryResult ack) throws TException {
DataGroupEngine.getInstance().getDataSyncService(ack.getHeader()).acknowledgeAppendEntry(ack);
}
@Override
- public void appendEntryIndirect(
- AppendEntryRequest request,
- List<Node> subReceivers,
- AsyncMethodCallback<AppendEntryResult> resultHandler)
- throws TException {
- DataGroupEngine.getInstance()
- .getDataAsyncService(request.getHeader(), resultHandler, request)
- .appendEntryIndirect(request, subReceivers, resultHandler);
- }
-
- @Override
public void acknowledgeAppendEntry(AppendEntryResult ack, AsyncMethodCallback<Void> resultHandler)
throws TException {
DataGroupEngine.getInstance()
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 32cba96..1db484e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -47,7 +46,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.List;
public class MetaAsyncService extends BaseAsyncService implements TSMetaService.AsyncIface {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index aae1cb7..7a6eb2c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -48,7 +47,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.List;
public class MetaSyncService extends BaseSyncService implements TSMetaService.Iface {