You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/21 07:39:29 UTC
[iotdb] branch expr updated: reuse client in LosDispatcher add commitIndex in heartbeat to update peer index remove timer threshold
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr by this push:
new e6bf459 reuse client in LosDispatcher add commitIndex in heartbeat to update peer index remove timer threshold
e6bf459 is described below
commit e6bf459c74fe57256b6b5932e296bbad6a89f466
Author: jt <jt...@163.com>
AuthorDate: Tue Dec 21 15:36:00 2021 +0800
reuse client in LosDispatcher
add commitIndex in heartbeat to update peer index
remove timer threshold
---
.../java/org/apache/iotdb/cluster/log/Log.java | 6 +++-
.../apache/iotdb/cluster/log/LogDispatcher.java | 10 +++----
.../iotdb/cluster/log/catchup/CatchUpTask.java | 8 +++--
.../cluster/log/catchup/SnapshotCatchUpTask.java | 35 +++++++++++++---------
.../iotdb/cluster/log/logtypes/CloseFileLog.java | 2 +-
.../cluster/log/logtypes/EmptyContentLog.java | 2 +-
.../iotdb/cluster/log/logtypes/LargeTestLog.java | 2 +-
.../cluster/log/logtypes/PhysicalPlanLog.java | 2 +-
.../server/handlers/caller/HeartbeatHandler.java | 6 +---
.../iotdb/cluster/server/member/RaftMember.java | 11 ++++---
.../apache/iotdb/cluster/server/monitor/Timer.java | 9 ------
thrift-cluster/src/main/thrift/cluster.thrift | 1 +
12 files changed, 49 insertions(+), 45 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 4845bba..59dd534 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -33,7 +33,7 @@ public abstract class Log implements Comparable<Log> {
Comparator.comparingLong(Log::getCurrLogIndex).thenComparing(Log::getCurrLogTerm);
// make this configurable or adaptive
- public static int DEFAULT_BUFFER_SIZE = 16 * 1024;
+ private static final int DEFAULT_BUFFER_SIZE = 16 * 1024;
private long currLogIndex;
private long currLogTerm;
@@ -48,6 +48,10 @@ public abstract class Log implements Comparable<Log> {
private int byteSize = 0;
+ public static int getDefaultBufferSize() {
+ return DEFAULT_BUFFER_SIZE;
+ }
+
public abstract ByteBuffer serialize();
public abstract void deserialize(ByteBuffer buffer);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index ba8f55f..ffcdb8f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -320,10 +320,8 @@ public class LogDispatcher {
}
Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
- Client client = member.getSyncClient(receiver);
if (client == null) {
- logger.error("No available client for {}", receiver);
- return;
+ client = member.getSyncClient(receiver);
}
AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
@@ -333,10 +331,10 @@ public class LogDispatcher {
handler.onComplete(result);
} catch (TException e) {
client.getInputProtocol().getTransport().close();
- handler.onError(e);
- logger.warn("Failed logs: {}, first index: {}", logList, request.prevLogIndex + 1);
- } finally {
ClientUtils.putBackSyncClient(client);
+ client = member.getSyncClient(receiver);
+ logger.warn("Failed logs: {}, first index: {}", logList, request.prevLogIndex + 1);
+ handler.onError(e);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
index 0552e2e..061490f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
@@ -59,6 +59,8 @@ public class CatchUpTask implements Runnable {
private String name;
private int raftId;
+ private long startTime;
+
public CatchUpTask(Node node, int raftId, Peer peer, RaftMember raftMember, long lastLogIdx) {
this.node = node;
this.raftId = raftId;
@@ -345,6 +347,7 @@ public class CatchUpTask implements Runnable {
@Override
public void run() {
+ startTime = System.currentTimeMillis();
try {
boolean findMatchedIndex = checkMatchIndex();
if (abort) {
@@ -378,10 +381,11 @@ public class CatchUpTask implements Runnable {
}
if (logger.isInfoEnabled()) {
logger.info(
- "{}: Catch up {} finished, update it's matchIndex to {}",
+ "{}: Catch up {} finished, update it's matchIndex to {}, time consumption: {}ms",
raftMember.getName(),
node,
- peer.getMatchIndex());
+ peer.getMatchIndex(),
+ System.currentTimeMillis() - startTime);
}
peer.resetInconsistentHeartbeatNum();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
index b11f886..f32edad 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
@@ -36,6 +36,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Callable;
@@ -113,27 +114,33 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
}
private boolean sendSnapshotSync(SendSnapshotRequest request) throws TException {
- logger.info(
- "{}: sending a snapshot request size={} to {}",
- raftMember.getName(),
- request.getSnapshotBytes().length,
- node);
- Client client = raftMember.getSyncClient(node);
- if (client == null) {
- return false;
- }
- try {
+ int retry = 5;
+ for (int i = 0; i < retry; i++) {
+ logger.info(
+ "{}: sending a snapshot request size={} to {}",
+ raftMember.getName(),
+ request.getSnapshotBytes().length,
+ node);
+
+ Client client = raftMember.getSyncClient(node);
+ if (client == null) {
+ return false;
+ }
+
try {
client.sendSnapshot(request);
logger.info("{}: snapshot is sent to {}", raftMember.getName(), node);
return true;
} catch (TException e) {
- client.getInputProtocol().getTransport().close();
- throw e;
+ if (!(e.getCause() instanceof SocketTimeoutException)) {
+ client.getInputProtocol().getTransport().close();
+ throw e;
+ }
+ } finally {
+ ClientUtils.putBackSyncClient(client);
}
- } finally {
- ClientUtils.putBackSyncClient(client);
}
+ return false;
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/CloseFileLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/CloseFileLog.java
index 675b7fe..bade7a1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/CloseFileLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/CloseFileLog.java
@@ -46,7 +46,7 @@ public class CloseFileLog extends Log {
@Override
public ByteBuffer serialize() {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(getDefaultBufferSize());
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) CLOSE_FILE.ordinal());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/EmptyContentLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/EmptyContentLog.java
index 1e785d5..cc9cec6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/EmptyContentLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/EmptyContentLog.java
@@ -39,7 +39,7 @@ public class EmptyContentLog extends Log {
@Override
public ByteBuffer serialize() {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(getDefaultBufferSize());
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) EMPTY_CONTENT.ordinal());
dataOutputStream.writeLong(getCurrLogIndex());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/LargeTestLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/LargeTestLog.java
index 32e4a1c..12a8e61 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/LargeTestLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/LargeTestLog.java
@@ -38,7 +38,7 @@ public class LargeTestLog extends Log {
@Override
public ByteBuffer serialize() {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(getDefaultBufferSize());
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) TEST_LARGE_CONTENT.ordinal());
dataOutputStream.writeLong(getCurrLogIndex());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
index 7ecdf5f..1f3882b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
@@ -48,7 +48,7 @@ public class PhysicalPlanLog extends Log {
@Override
public ByteBuffer serialize() {
- PublicBAOS byteArrayOutputStream = new PublicBAOS(DEFAULT_BUFFER_SIZE);
+ PublicBAOS byteArrayOutputStream = new PublicBAOS(getDefaultBufferSize());
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) PHYSICAL_PLAN.ordinal());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index bccb16f..8e885cf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -108,11 +108,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
if (!localMember.getLogManager().isLogUpToDate(lastLogTerm, lastLogIdx)
|| !localMember.getLogManager().matchTerm(lastLogTerm, lastLogIdx)) {
// the follower is not up-to-date
- if (lastLogIdx == -1) {
- // maybe the follower has restarted, so we need to find its match index again, because
- // some logs may be lost due to restart
- peer.setMatchIndex(-1);
- }
+ peer.setMatchIndex(Math.max(peer.getMatchIndex(), resp.getCommitIndex()));
// only start a catch up when the follower's lastLogIndex remains stall and unchanged for 3
// heartbeats
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 942379f..287330c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -445,6 +445,7 @@ public abstract class RaftMember implements RaftMemberMBean {
// tell the leader the local log progress so it may decide whether to perform a catch up
response.setLastLogIndex(logManager.getLastLogIndex());
response.setLastLogTerm(logManager.getLastLogTerm());
+ response.setCommitIndex(logManager.getCommitLogIndex());
if (logger.isDebugEnabled()) {
logger.debug(
@@ -1117,8 +1118,9 @@ public abstract class RaftMember implements RaftMemberMBean {
}
// if a single log exceeds the threshold
// we need to return error code to the client as in server mode
- if (log.serialize().capacity() + Integer.BYTES
- >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ if (config.isEnableRaftLogPersistence()
+ && (log.serialize().capacity() + Integer.BYTES
+ >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize())) {
logger.error(
"Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ "or reduce the size of requests you send.");
@@ -1166,8 +1168,9 @@ public abstract class RaftMember implements RaftMemberMBean {
}
// just like processPlanLocally,we need to check the size of log
- if (log.serialize().capacity() + Integer.BYTES
- >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+ if (config.isEnableRaftLogPersistence()
+ && (log.serialize().capacity() + Integer.BYTES
+ >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize())) {
logger.error(
"Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ "or reduce the size of requests you send.");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index c794211..5a55f62 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -289,7 +289,6 @@ public class Timer {
int level;
Statistic parent;
List<Statistic> children = new ArrayList<>();
- long warningThreshold = 30 * 1000 * 1000 * 1000L;
Statistic(String className, String blockName, double scale, boolean valid, Statistic parent) {
this.className = className;
@@ -329,14 +328,6 @@ public class Timer {
if (ENABLE_INSTRUMENTING && startTime != Long.MIN_VALUE) {
long consumed = System.nanoTime() - startTime;
add(consumed);
- if (consumed >= warningThreshold && logger.isWarnEnabled()) {
- logger.warn(
- "Operation {}.{}.{} costs more than warning threshold: {}",
- className,
- blockName,
- name(),
- consumed);
- }
}
}
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index 64914ba..df93e31 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -56,6 +56,7 @@ struct HeartBeatResponse {
// because a data server may play many data groups members, this is used to identify which
// member should process the request or response. Only used in data group communication.
7: optional Node header
+ 8: optional long commitIndex
}
struct RequestCommitIndexResponse {