You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/03/11 03:40:02 UTC
[iotdb] branch expr_plus updated: support appendEntriesIndirect
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_plus by this push:
new 991b1b2 support appendEntriesIndirect
991b1b2 is described below
commit 991b1b2fe98824b00bc6373cb5ef7d0982af8d2c
Author: jt <jt...@163.com>
AuthorDate: Fri Mar 11 11:39:15 2022 +0800
support appendEntriesIndirect
---
cluster/distribute-dc.sh | 10 +++
cluster/src/assembly/resources/sbin/expr-bench.sh | 89 ++++++++++++++++++++++
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 11 ++-
.../apache/iotdb/cluster/client/ClientManager.java | 11 +--
.../iotdb/cluster/client/ClientPoolFactory.java | 1 +
.../iotdb/cluster/config/ClusterConstant.java | 2 +-
.../iotdb/cluster/coordinator/Coordinator.java | 5 +-
.../org/apache/iotdb/cluster/expr/ExprBench.java | 35 ++++++++-
.../iotdb/cluster/log/IndirectLogDispatcher.java | 23 +++---
.../apache/iotdb/cluster/log/LogDispatcher.java | 2 +-
.../org/apache/iotdb/cluster/log/LogRelay.java | 41 ++++++++--
.../apache/iotdb/cluster/log/VotingLogList.java | 13 +++-
.../cluster/log/manage/CommittedEntryManager.java | 4 +-
.../iotdb/cluster/log/manage/RaftLogManager.java | 17 ++---
.../log/manage/UnCommittedEntryManager.java | 4 +
.../iotdb/cluster/query/ClusterPlanRouter.java | 28 ++++++-
.../handlers/caller/AppendNodeEntryHandler.java | 13 ++++
.../server/handlers/caller/HeartbeatHandler.java | 2 +-
.../cluster/server/member/DataGroupMember.java | 4 +-
.../cluster/server/member/MetaGroupMember.java | 7 +-
.../iotdb/cluster/server/member/RaftMember.java | 82 +++++++++++---------
.../apache/iotdb/cluster/server/monitor/Timer.java | 3 +
.../cluster/server/service/BaseAsyncService.java | 12 ---
.../cluster/server/service/BaseSyncService.java | 10 ---
.../cluster/server/service/MetaAsyncService.java | 12 ---
.../cluster/server/service/MetaSyncService.java | 10 ---
.../apache/iotdb/cluster/utils/ClusterUtils.java | 4 +
.../apache/iotdb/cluster/utils/PartitionUtils.java | 5 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 ++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +-
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 1 +
.../apache/iotdb/db/qp/physical/sys/DummyPlan.java | 29 ++++++-
thrift-cluster/src/main/thrift/cluster.thrift | 10 +--
33 files changed, 368 insertions(+), 149 deletions(-)
diff --git a/cluster/distribute-dc.sh b/cluster/distribute-dc.sh
new file mode 100644
index 0000000..1eba6f4
--- /dev/null
+++ b/cluster/distribute-dc.sh
@@ -0,0 +1,10 @@
+src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
+
+ips=(dc11 dc12 dc13 dc14 dc15 dc16 dc17 dc18)
+target_lib_path=/home/jt/iotdb_expr/lib
+
+for ip in ${ips[*]}
+ do
+ ssh jt@$ip "mkdir $target_lib_path"
+ scp -r $src_lib_path jt@$ip:$target_lib_path
+ done
\ No newline at end of file
diff --git a/cluster/src/assembly/resources/sbin/expr-bench.sh b/cluster/src/assembly/resources/sbin/expr-bench.sh
new file mode 100644
index 0000000..00c9411
--- /dev/null
+++ b/cluster/src/assembly/resources/sbin/expr-bench.sh
@@ -0,0 +1,89 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ---------------------
+echo "Starting IoTDB (Cluster Mode)"
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+ export IOTDB_HOME="`dirname "$0"`/.."
+fi
+
+enable_printgc=false
+if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
+ enable_printgc=true;
+ shift
+fi
+
+if [ -f "$IOTDB_CONF/iotdb-env.sh" ]; then
+ if [ $enable_printgc == "true" ]; then
+ . "$IOTDB_CONF/iotdb-env.sh" "printgc"
+ else
+ . "$IOTDB_CONF/iotdb-env.sh"
+ fi
+elif [ -f "${IOTDB_HOME}/conf/iotdb-env.sh" ]; then
+ if [ $enable_printgc == "true" ]; then
+ . "${IOTDB_HOME}/conf/iotdb-env.sh" "printgc"
+ else
+ . "${IOTDB_HOME}/conf/iotdb-env.sh"
+ fi
+else
+ echo "can't find $IOTDB_CONF/iotdb-env.sh"
+fi
+
+if [ -n "$JAVA_HOME" ]; then
+ for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+ if [ -x "$java" ]; then
+ JAVA="$java"
+ break
+ fi
+ done
+else
+ JAVA=java
+fi
+
+if [ -z $JAVA ] ; then
+ echo Unable to find java executable. Check JAVA_HOME and PATH environment variables. > /dev/stderr
+ exit 1;
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+ CLASSPATH=${CLASSPATH}":"$f
+done
+classname=org.apache.iotdb.cluster.expr.ExprBench
+
+launch_service()
+{
+ iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml"
+ iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
+ iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
+ iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+ iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
+ iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
+ exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" $@
+ return $?
+}
+
+# Start up the service
+launch_service "$classname" $@
+
+exit $?
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 10b6df8..36c40d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -288,11 +288,14 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
}
private String clusterConfigCheck() {
- try {
- ClusterDescriptor.getInstance().replaceHostnameWithIp();
- } catch (Exception e) {
- return String.format("replace hostname with ip failed, %s", e.getMessage());
+ if (IoTDBDescriptor.getInstance().getConfig().isReplaceHostNameWithIp()) {
+ try {
+ ClusterDescriptor.getInstance().replaceHostnameWithIp();
+ } catch (Exception e) {
+ return String.format("replace hostname with ip failed, %s", e.getMessage());
+ }
}
+
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
// check the initial replicateNum and refuse to start when the replicateNum <= 0
if (config.getReplicationNum() <= 0) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
index adc1574..bf918d6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
@@ -50,8 +50,6 @@ public class ClientManager implements IClientManager {
private Map<ClientCategory, GenericKeyedObjectPool<Node, Client>> syncClientPoolMap;
private ClientPoolFactory clientPoolFactory;
- private Exception createStack;
-
/**
* {@link ClientManager.Type#RequestForwardClient} represents the clients used to forward external
* client requests to proper node to handle such as query, insert request.
@@ -78,8 +76,6 @@ public class ClientManager implements IClientManager {
syncClientPoolMap = Maps.newHashMap();
constructSyncClientMap(type);
}
-
- this.createStack = new Exception();
}
private void constructAsyncClientMap(Type type) {
@@ -164,7 +160,7 @@ public class ClientManager implements IClientManager {
"BorrowSyncClient invoke on unsupported mode or category: Node:{}, ClientCategory:{}, "
+ "isSyncMode:{}",
node,
- clientPoolFactory,
+ category,
syncClientPoolMap != null);
}
return client;
@@ -200,7 +196,7 @@ public class ClientManager implements IClientManager {
"BorrowSyncClient invoke on unsupported mode or category: Node:{}, ClientCategory:{}, "
+ "isSyncMode:{}",
node,
- clientPoolFactory,
+ category,
syncClientPoolMap != null);
}
return client;
@@ -231,9 +227,6 @@ public class ClientManager implements IClientManager {
@Override
public void close() {
- if (false) {
- return;
- }
if (asyncClientPoolMap != null) {
for (GenericKeyedObjectPool<Node, AsyncClient> value : asyncClientPoolMap.values()) {
value.close();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java
index 0887992..89b8307 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java
@@ -54,6 +54,7 @@ public class ClientPoolFactory {
: new TBinaryProtocol.Factory();
poolConfig = new GenericKeyedObjectPoolConfig();
poolConfig.setMaxTotalPerKey(maxConnectionForEachNode);
+ poolConfig.setMaxIdlePerKey(maxConnectionForEachNode);
poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMS));
poolConfig.setTestOnReturn(true);
poolConfig.setTestOnBorrow(true);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index cb892ae..85ba9ff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@ -53,7 +53,7 @@ public class ClusterConstant {
* every "REPORT_INTERVAL_SEC" seconds, a reporter thread will print the status of all raft
* members in this node.
*/
- public static final int REPORT_INTERVAL_SEC = 10;
+ public static final int REPORT_INTERVAL_SEC = 2;
/**
* during snapshot, hardlinks of data files are created to for downloading. hardlinks will be
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 098b127..9d16670 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.ChangeMembershipException;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.metadata.CMManager;
@@ -424,10 +423,10 @@ public class Coordinator {
metaGroupMember.syncLeaderWithConsistencyCheck(true);
try {
planGroupMap = router.splitAndRoutePlan(plan);
- } catch (MetadataException | UnknownLogTypeException ex) {
+ } catch (MetadataException ex) {
// ignore
}
- } catch (MetadataException | UnknownLogTypeException e) {
+ } catch (MetadataException e) {
logger.error("Cannot route plan {}", plan, e);
}
logger.debug("route plan {} with partitionGroup {}", plan, planGroupMap);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index 3a658d5..7673e11 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -26,12 +26,16 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
import org.apache.thrift.TException;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
@@ -48,6 +52,8 @@ public class ExprBench {
private Node target;
private int maxRequestNum;
private ExecutorService pool = Executors.newCachedThreadPool();
+ private List<Node> nodeList = new ArrayList<>();
+ private int raftFactor = 1;
public ExprBench(Node target) {
this.target = target;
@@ -57,8 +63,10 @@ public class ExprBench {
public void benchmark() {
long startTime = System.currentTimeMillis();
for (int i = 0; i < threadNum; i++) {
+ int finalI = i;
pool.submit(
() -> {
+ Random random = new Random(123456L + finalI);
Client client = null;
try {
client = clientPool.borrowSyncClient(target, ClientCategory.META);
@@ -69,13 +77,23 @@ public class ExprBench {
DummyPlan plan = new DummyPlan();
plan.setWorkload(new byte[workloadSize]);
plan.setNeedForward(true);
+
ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
- plan.serialize(byteBuffer);
- byteBuffer.flip();
- request.setPlanBytes(byteBuffer);
+
long currRequsetNum = -1;
while (true) {
+ if (raftFactor > 0) {
+ Node node = nodeList.get(random.nextInt(nodeList.size()));
+ int raftId = random.nextInt(raftFactor);
+ plan.setGroupIdentifier(ClusterUtils.nodeToString(node) + "#" + raftId);
+ }
+ byteBuffer.clear();
+ plan.serialize(byteBuffer);
+ byteBuffer.flip();
+ request.planBytes = byteBuffer;
+ request.setPlanBytesIsSet(true);
+
long reqLatency = System.nanoTime();
try {
client.executeNonQueryPlan(request);
@@ -108,6 +126,7 @@ public class ExprBench {
}
});
}
+ pool.shutdown();
}
public void setMaxRequestNum(int maxRequestNum) {
@@ -124,6 +143,16 @@ public class ExprBench {
bench.threadNum = Integer.parseInt(args[3]);
bench.workloadSize = Integer.parseInt(args[4]) * 1024;
bench.printInterval = Integer.parseInt(args[5]);
+ String[] nodesSplit = args[6].split(",");
+ for (String s : nodesSplit) {
+ String[] nodeSplit = s.split(":");
+ Node node = new Node();
+ node.setInternalIp(nodeSplit[0]);
+ node.setMetaPort(Integer.parseInt(nodeSplit[1]));
+ bench.nodeList.add(node);
+ }
+ bench.raftFactor = Integer.parseInt(args[7]);
+
bench.benchmark();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index cc05753..9c878df 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.cluster.log;
+import java.nio.ByteBuffer;
import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Timer;
@@ -100,18 +102,15 @@ public class IndirectLogDispatcher extends LogDispatcher {
@Override
void sendLog(SendLogRequest logRequest) {
- Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
- logRequest.getVotingLog().getLog().getCreateTime());
- member.sendLogToFollower(
- logRequest.getVotingLog(),
- receiver,
- logRequest.getLeaderShipStale(),
- logRequest.getNewLeaderTerm(),
- logRequest.getAppendEntryRequest(),
- logRequest.getQuorumSize(),
- directToIndirectFollowerMap.get(receiver));
- Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
- logRequest.getVotingLog().getLog().getCreateTime());
+ logRequest.getAppendEntryRequest().setSubReceivers(directToIndirectFollowerMap.get(receiver));
+ super.sendLog(logRequest);
+ }
+
+ @Override
+ protected AppendEntriesRequest prepareRequest(List<ByteBuffer> logList,
+ List<SendLogRequest> currBatch, int firstIndex) {
+ return super.prepareRequest(logList, currBatch, firstIndex)
+ .setSubReceivers(directToIndirectFollowerMap.get(receiver));
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index ca6e8ae..2410b2f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -362,7 +362,7 @@ public class LogDispatcher {
}
}
- private AppendEntriesRequest prepareRequest(
+ protected AppendEntriesRequest prepareRequest(
List<ByteBuffer> logList, List<SendLogRequest> currBatch, int firstIndex) {
AppendEntriesRequest request = new AppendEntriesRequest();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
index 3ed91f5..91339c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
@@ -20,9 +20,11 @@
package org.apache.iotdb.cluster.log;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -59,6 +61,10 @@ public class LogRelay {
}
public void offer(AppendEntryRequest request, List<Node> receivers) {
+ offer(new RelayEntry(request, receivers));
+ }
+
+ private void offer(RelayEntry entry) {
synchronized (entryHeap) {
while (entryHeap.size()
> ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
@@ -68,11 +74,15 @@ public class LogRelay {
Thread.currentThread().interrupt();
}
}
- entryHeap.add(new RelayEntry(request, receivers));
+ entryHeap.add(entry);
entryHeap.notifyAll();
}
}
+ public void offer(AppendEntriesRequest request, List<Node> receivers) {
+ offer(new RelayEntry(request, receivers));
+ }
+
private class RelayThread implements Runnable {
@Override
@@ -92,21 +102,37 @@ public class LogRelay {
}
}
- raftMember.sendLogToSubFollowers(relayEntry.request, relayEntry.receivers);
+ raftMember.sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
+ Statistic.RAFT_SEND_RELAY.add(1);
}
}
}
public static class RelayEntry implements Comparable<RelayEntry> {
- private AppendEntryRequest request;
+ private AppendEntryRequest singleRequest;
+ private AppendEntriesRequest batchRequest;
private List<Node> receivers;
public RelayEntry(AppendEntryRequest request, List<Node> receivers) {
- this.request = request;
+ this.singleRequest = request;
+ this.receivers = receivers;
+ }
+
+ public RelayEntry(AppendEntriesRequest request, List<Node> receivers) {
+ this.batchRequest = request;
this.receivers = receivers;
}
+ public long getIndex() {
+ if (singleRequest != null) {
+ return singleRequest.prevLogIndex;
+ } else if (batchRequest != null) {
+ return batchRequest.prevLogIndex;
+ }
+ return 0;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -116,17 +142,18 @@ public class LogRelay {
return false;
}
RelayEntry that = (RelayEntry) o;
- return Objects.equals(request, that.request);
+ return Objects.equals(singleRequest, that.singleRequest) && Objects.equals(batchRequest,
+ that.batchRequest);
}
@Override
public int hashCode() {
- return Objects.hash(request);
+ return Objects.hash(singleRequest);
}
@Override
public int compareTo(RelayEntry o) {
- return Long.compare(this.request.prevLogIndex, o.request.prevLogIndex);
+ return Long.compare(this.getIndex(), o.getIndex());
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index f5cce6c..59bc40b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -19,6 +19,10 @@
package org.apache.iotdb.cluster.log;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.tsfile.utils.Pair;
+
import java.util.ArrayList;
import java.util.List;
@@ -27,9 +31,11 @@ public class VotingLogList {
private List<VotingLog> logList = new ArrayList<>();
private volatile long currTerm = -1;
private int quorumSize;
+ private RaftMember member;
- public VotingLogList(int quorumSize) {
+ public VotingLogList(int quorumSize, RaftMember member) {
this.quorumSize = quorumSize;
+ this.member = member;
}
/**
@@ -92,6 +98,11 @@ public class VotingLogList {
acceptedLog.acceptedTime.set(System.nanoTime());
acceptedLog.notifyAll();
}
+ if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
+ member.removeAppendLogHandler(
+ new Pair<>(
+ acceptedLog.getLog().getCurrLogIndex(), acceptedLog.getLog().getCurrLogTerm()));
+ }
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index 77be5b3..85b618d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -79,8 +79,8 @@ public class CommittedEntryManager {
logger.info("requested snapshot is older than the existing snapshot");
return;
}
- entries.clear();
- entries.add(new EmptyContentLog(snapshot.getLastLogIndex(), snapshot.getLastLogTerm()));
+ entries.subList(1, entries.size()).clear();
+ entries.set(0, new EmptyContentLog(snapshot.getLastLogIndex(), snapshot.getLastLogTerm()));
}
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index b75f1e7..4b23092 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -325,7 +325,7 @@ public abstract class RaftLogManager {
public long getCommitLogTerm() {
long term = -1;
try {
- term = getTerm(getCommitLogIndex());
+ term = getCommittedEntryManager().getLastTerm();
} catch (Exception e) {
logger.error("{}: unexpected error when getting the last term : {}", name, e.getMessage());
}
@@ -519,19 +519,18 @@ public abstract class RaftLogManager {
name,
snapshot.getLastLogIndex(),
snapshot.getLastLogTerm());
- try {
- getCommittedEntryManager().compactEntries(snapshot.getLastLogIndex());
- getStableEntryManager().removeCompactedEntries(snapshot.getLastLogIndex());
- } catch (EntryUnavailableException e) {
- getCommittedEntryManager().applyingSnapshot(snapshot);
- getUnCommittedEntryManager().applyingSnapshot(snapshot);
- }
+
+ getUnCommittedEntryManager().applyingSnapshot(snapshot);
+ getCommittedEntryManager().applyingSnapshot(snapshot);
+
if (this.commitIndex < snapshot.getLastLogIndex()) {
this.commitIndex = snapshot.getLastLogIndex();
}
// as the follower receives a snapshot, the logs persisted is not complete, so remove them
- getStableEntryManager().clearAllLogs(commitIndex);
+ if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
+ getStableEntryManager().clearAllLogs(commitIndex);
+ }
synchronized (changeApplyCommitIndexCond) {
if (this.maxHaveAppliedCommitIndex < snapshot.getLastLogIndex()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
index 80bf2f8..1c63bff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
@@ -288,4 +288,8 @@ public class UnCommittedEntryManager {
List<Log> getAllEntries() {
return entries;
}
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index 8b1ad59..5c56677 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.query;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
@@ -27,6 +26,8 @@ import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.PartitionUtils;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -42,6 +43,7 @@ import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
import org.apache.iotdb.db.qp.physical.sys.LogPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
import org.apache.iotdb.db.service.IoTDB;
@@ -117,7 +119,7 @@ public class ClusterPlanRouter {
}
public Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(PhysicalPlan plan)
- throws UnsupportedPlanException, MetadataException, UnknownLogTypeException {
+ throws UnsupportedPlanException, MetadataException {
if (plan instanceof InsertRowsPlan) {
return splitAndRoutePlan((InsertRowsPlan) plan);
} else if (plan instanceof InsertTabletPlan) {
@@ -136,6 +138,8 @@ public class ClusterPlanRouter {
return splitAndRoutePlan((AlterTimeSeriesPlan) plan);
} else if (plan instanceof CreateMultiTimeSeriesPlan) {
return splitAndRoutePlan((CreateMultiTimeSeriesPlan) plan);
+ } else if (plan instanceof DummyPlan) {
+ return splitAndRoutePlan((DummyPlan) plan);
}
// the if clause can be removed after the program is stable
if (PartitionUtils.isLocalNonQueryPlan(plan)) {
@@ -172,6 +176,26 @@ public class ClusterPlanRouter {
return result;
}
+ private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(DummyPlan plan) {
+ List<Node> allNodes = partitionTable.getAllNodes();
+ String groupIdentifier = plan.getGroupIdentifier();
+ String[] split = groupIdentifier.split(DummyPlan.GROUP_ID_SEPARATOR);
+ Node node = ClusterUtils.parseNode(split[0]);
+ int raftId = Integer.parseInt(split[1]);
+ Node innerNode = null;
+ for (Node n : allNodes) {
+ if (ClusterUtils.isNodeEquals(node, n)) {
+ innerNode = n;
+ break;
+ }
+ }
+ if (innerNode == null) {
+ return null;
+ }
+ RaftNode raftNode = new RaftNode(innerNode, raftId);
+ return Collections.singletonMap(plan, partitionTable.getPartitionGroup(raftNode));
+ }
+
private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowPlan plan)
throws MetadataException {
PartitionGroup partitionGroup =
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 9cae7e8..7e703a6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
@@ -113,12 +114,20 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
synchronized (log) {
log.notifyAll();
}
+ if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
+ member.removeAppendLogHandler(
+ new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm()));
+ }
} else if (resp == RESPONSE_WEAK_ACCEPT) {
synchronized (log) {
log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
>= quorumSize) {
log.acceptedTime.set(System.nanoTime());
+ if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
+ member.removeAppendLogHandler(
+ new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm()));
+ }
}
log.notifyAll();
}
@@ -160,6 +169,10 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
// quorum members have failed, there is no need to wait for others
log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
log.notifyAll();
+ if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
+ member.removeAppendLogHandler(
+ new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm()));
+ }
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index 6eb735d..af4fbc5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -119,7 +119,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
if (lastLogIdx == peer.getLastHeartBeatIndex()) {
// the follower's lastLogIndex is unchanged, increase inconsistent counter
int inconsistentNum = peer.incInconsistentHeartbeatNum();
- if (inconsistentNum >= 5) {
+ if (inconsistentNum >= 10) {
logger.info(
"{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 2a80781..c43401a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -406,7 +406,7 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
if (removedNode.equals(leader.get()) && !removedNode.equals(thisNode)) {
// if the leader is removed, also start an election immediately
- synchronized (term) {
+ synchronized (logManager) {
setCharacter(NodeCharacter.ELECTOR);
setLeader(null);
}
@@ -962,7 +962,7 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
peerMap.remove(removedNode);
if (removedNode.equals(leader.get())) {
// if the leader is removed, also start an election immediately
- synchronized (term) {
+ synchronized (logManager) {
setCharacter(NodeCharacter.ELECTOR);
setLeader(null);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 841b32b..e07452c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1317,8 +1317,9 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
result = processNonPartitionedMetaPlan(plan);
} else {
// do nothing
- logger.warn("receive a plan {} could not be processed in local", plan);
- result = StatusUtils.UNSUPPORTED_OPERATION;
+ // logger.warn("receive a plan {} could not be processed in local", plan);
+ // result = StatusUtils.UNSUPPORTED_OPERATION;
+ result = coordinator.executeNonQueryPlan(plan);
}
Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
return result;
@@ -1708,7 +1709,7 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
// the leader is removed, start the next election ASAP
if (oldNode.equals(leader.get()) && !oldNode.equals(thisNode)) {
- synchronized (term) {
+ synchronized (logManager) {
setCharacter(NodeCharacter.ELECTOR);
setLeader(null);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index a28f220..0c227c5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.server.member;
+import java.nio.Buffer;
import org.apache.iotdb.cluster.ClusterIoTDB;
import org.apache.iotdb.cluster.client.ClientCategory;
import org.apache.iotdb.cluster.client.ClientManager;
@@ -96,6 +97,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
import org.apache.iotdb.db.qp.physical.sys.LogPlan;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcUtils;
@@ -436,7 +438,7 @@ public abstract class RaftMember implements RaftMemberMBean {
*/
public HeartBeatResponse processHeartbeatRequest(HeartBeatRequest request) {
logger.trace("{} received a heartbeat", name);
- synchronized (term) {
+ synchronized (logManager) {
long thisTerm = term.get();
long leaderTerm = request.getTerm();
HeartBeatResponse response = new HeartBeatResponse();
@@ -520,7 +522,7 @@ public abstract class RaftMember implements RaftMemberMBean {
logger.debug(
"{}: start to handle request from elector {}", name, electionRequest.getElector());
}
- synchronized (term) {
+ synchronized (logManager) {
long currentTerm = term.get();
long response =
checkElectorTerm(currentTerm, electionRequest.getTerm(), electionRequest.getElector());
@@ -576,6 +578,15 @@ public abstract class RaftMember implements RaftMemberMBean {
* finally see if we can find a position to append the log.
*/
public AppendEntryResult appendEntry(AppendEntryRequest request) throws UnknownLogTypeException {
+ AppendEntryResult result = appendEntryInternal(request);
+ if (request.isSetSubReceivers()) {
+ request.entry.rewind();
+ logRelay.offer(request, request.subReceivers);
+ }
+ return result;
+ }
+
+ private AppendEntryResult appendEntryInternal(AppendEntryRequest request) throws UnknownLogTypeException {
logger.debug("{} received an AppendEntryRequest: {}", name, request);
// the term checked here is that of the leader, not that of the log
long checkResult = checkRequestTerm(request.term, request.leader);
@@ -598,6 +609,7 @@ public abstract class RaftMember implements RaftMemberMBean {
if (!request.isFromLeader) {
appendAckLeader(request.leader, log, result.status);
+ Statistic.RAFT_SEND_RELAY_ACK.add(1);
}
return result;
@@ -661,6 +673,17 @@ public abstract class RaftMember implements RaftMemberMBean {
/** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
public AppendEntryResult appendEntries(AppendEntriesRequest request)
throws UnknownLogTypeException {
+ AppendEntryResult result = appendEntriesInternal(request);
+ if (request.isSetSubReceivers()) {
+ request.entries.forEach(Buffer::rewind);
+ logRelay.offer(request, request.subReceivers);
+ }
+ return result;
+ }
+
+ /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
+ private AppendEntryResult appendEntriesInternal(AppendEntriesRequest request)
+ throws UnknownLogTypeException {
logger.debug("{} received an AppendEntriesRequest", name);
// the term checked here is that of the leader, not that of the log
@@ -716,18 +739,13 @@ public abstract class RaftMember implements RaftMemberMBean {
AtomicLong newLeaderTerm,
AppendEntryRequest request,
Peer peer,
- int quorumSize,
- List<Node> indirectReceivers) {
+ int quorumSize) {
AsyncClient client = getSendLogAsyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, peer, quorumSize);
try {
- if (indirectReceivers == null || indirectReceivers.isEmpty()) {
- client.appendEntry(request, handler);
- } else {
- client.appendEntryIndirect(request, indirectReceivers, handler);
- }
+ client.appendEntry(request, handler);
logger.debug("{} sending a log to {}: {}", name, node, log);
} catch (Exception e) {
logger.warn("{} cannot append log to node {}", name, node, e);
@@ -796,7 +814,7 @@ public abstract class RaftMember implements RaftMemberMBean {
return;
}
- this.votingLogList = new VotingLogList(allNodes.size() / 2);
+ this.votingLogList = new VotingLogList(allNodes.size() / 2, this);
// update the reference of thisNode to keep consistency
boolean foundThisNode = false;
@@ -1686,7 +1704,8 @@ public abstract class RaftMember implements RaftMemberMBean {
return false;
}
PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
- return physicalPlanLog.getPlan() instanceof InsertPlan;
+ return physicalPlanLog.getPlan() instanceof InsertPlan
+ || physicalPlanLog.getPlan() instanceof DummyPlan;
}
/**
@@ -1904,7 +1923,7 @@ public abstract class RaftMember implements RaftMemberMBean {
* elector.
*/
public void stepDown(long newTerm, boolean fromLeader) {
- synchronized (term) {
+ synchronized (logManager) {
long currTerm = term.get();
// confirm that the heartbeat of the new leader hasn't come
if (currTerm < newTerm) {
@@ -1922,6 +1941,10 @@ public abstract class RaftMember implements RaftMemberMBean {
setCharacter(NodeCharacter.FOLLOWER);
lastHeartbeatReceivedTime = System.currentTimeMillis();
}
+
+ if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
+ sentLogHandlers.clear();
+ }
}
}
@@ -2093,17 +2116,6 @@ public abstract class RaftMember implements RaftMemberMBean {
return waitAppendResult(log, leaderShipStale, newLeaderTerm, quorumSize);
}
- public void sendLogToFollower(
- VotingLog log,
- Node node,
- AtomicBoolean leaderShipStale,
- AtomicLong newLeaderTerm,
- AppendEntryRequest request,
- int quorumSize) {
- sendLogToFollower(
- log, node, leaderShipStale, newLeaderTerm, request, quorumSize, Collections.emptyList());
- }
-
/** Send "log" to "node". */
public void sendLogToFollower(
VotingLog log,
@@ -2111,8 +2123,7 @@ public abstract class RaftMember implements RaftMemberMBean {
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request,
- int quorumSize,
- List<Node> indirectReceivers) {
+ int quorumSize) {
if (node.equals(thisNode)) {
return;
}
@@ -2134,10 +2145,10 @@ public abstract class RaftMember implements RaftMemberMBean {
if (config.isUseAsyncServer()) {
sendLogAsync(
- log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize, indirectReceivers);
+ log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
} else {
sendLogSync(
- log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize, indirectReceivers);
+ log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
}
}
@@ -2176,8 +2187,7 @@ public abstract class RaftMember implements RaftMemberMBean {
AtomicLong newLeaderTerm,
AppendEntryRequest request,
Peer peer,
- int quorumSize,
- List<Node> indirectReceivers) {
+ int quorumSize) {
Client client = getSyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
@@ -2185,12 +2195,7 @@ public abstract class RaftMember implements RaftMemberMBean {
try {
logger.debug("{} sending a log to {}: {}", name, node, log);
long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
- AppendEntryResult result;
- if (indirectReceivers == null || indirectReceivers.isEmpty()) {
- result = client.appendEntry(request);
- } else {
- result = client.appendEntryIndirect(request, indirectReceivers);
- }
+ AppendEntryResult result = client.appendEntry(request);
Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
handler.onComplete(result);
@@ -2222,6 +2227,10 @@ public abstract class RaftMember implements RaftMemberMBean {
handler.setPeer(peer);
handler.setReceiverTerm(newLeaderTerm);
handler.setQuorumSize(quorumSize);
+ if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
+ registerAppendLogHandler(
+ new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm()), handler);
+ }
return handler;
}
@@ -2243,7 +2252,7 @@ public abstract class RaftMember implements RaftMemberMBean {
private long checkRequestTerm(long leaderTerm, Node leader) {
long localTerm;
- synchronized (term) {
+ synchronized (logManager) {
// if the request comes before the heartbeat arrives, the local term may be smaller than the
// leader term
localTerm = term.get();
@@ -2288,6 +2297,7 @@ public abstract class RaftMember implements RaftMemberMBean {
sentLogHandlers.get(new Pair<>(ack.lastLogIndex, ack.lastLogTerm));
if (appendNodeEntryHandler != null) {
appendNodeEntryHandler.onComplete(ack);
+ Statistic.RAFT_RECEIVE_RELAY_ACK.add(1);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 05f262c..4169935 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -284,6 +284,9 @@ public class Timer {
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_WINDOW_LENGTH(RAFT_MEMBER_RECEIVER, "window length", 1, true, ROOT),
+ RAFT_SEND_RELAY(RAFT_MEMBER_RECEIVER, "send relay entries", 1, true, ROOT),
+ RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
+ RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", TIME_SCALE, true, ROOT),
RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index b7987f6..a521bb3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -185,16 +185,4 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
member.acknowledgeAppendLog(ack);
resultHandler.onComplete(null);
}
-
- @Override
- public void appendEntryIndirect(
- AppendEntryRequest request,
- List<Node> subReceivers,
- AsyncMethodCallback<AppendEntryResult> resultHandler) {
- try {
- resultHandler.onComplete(member.appendEntryIndirect(request, subReceivers));
- } catch (UnknownLogTypeException e) {
- resultHandler.onError(e);
- }
- }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index b812404..a8f2704 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -165,14 +165,4 @@ public abstract class BaseSyncService implements RaftService.Iface {
public void acknowledgeAppendEntry(AppendEntryResult ack) {
member.acknowledgeAppendLog(ack);
}
-
- @Override
- public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
- throws TException {
- try {
- return member.appendEntryIndirect(request, subReceivers);
- } catch (UnknownLogTypeException e) {
- throw new TException(e);
- }
- }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 5ef57e8..32cba96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -260,18 +260,6 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
}
@Override
- public void appendEntryIndirect(
- AppendEntryRequest request,
- List<Node> subReceivers,
- AsyncMethodCallback<AppendEntryResult> resultHandler) {
- try {
- resultHandler.onComplete(metaGroupMember.appendEntryIndirect(request, subReceivers));
- } catch (UnknownLogTypeException e) {
- resultHandler.onError(e);
- }
- }
-
- @Override
public void acknowledgeAppendEntry(
AppendEntryResult ack, AsyncMethodCallback<Void> resultHandler) {
metaGroupMember.acknowledgeAppendLog(ack);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 93a0ba6..aae1cb7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -255,16 +255,6 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
}
@Override
- public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
- throws TException {
- try {
- return metaGroupMember.appendEntryIndirect(request, subReceivers);
- } catch (UnknownLogTypeException e) {
- throw new TException(e);
- }
- }
-
- @Override
public void acknowledgeAppendEntry(AppendEntryResult ack) {
metaGroupMember.acknowledgeAppendLog(ack);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 49cfdbc..5b73575 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -384,4 +384,8 @@ public class ClusterUtils {
ClusterNode clusterNode2 = new ClusterNode(node2);
return clusterNode1.equals(clusterNode2);
}
+
+ public static String nodeToString(Node node) {
+ return node.getInternalIp() + ":" + node.getMetaPort();
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index 0282b9d..2e0b000 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
@@ -103,7 +104,9 @@ public class PartitionUtils {
|| plan instanceof CreateFunctionPlan
|| plan instanceof DropFunctionPlan
|| plan instanceof CreateSnapshotPlan
- || plan instanceof SetSystemModePlan;
+ || plan instanceof SetSystemModePlan
+ || (plan instanceof DummyPlan
+ && DummyPlan.META_GROUP_ID.equals(((DummyPlan) plan).getGroupIdentifier()));
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4842bb2..8f68b46 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -802,6 +802,11 @@ public class IoTDBConfig {
*/
private boolean enableIDTableLogFile = false;
+ /**
+ * to avoid the situation that in some places the hostname is used while in other the ip is used.
+ */
+ private boolean replaceHostNameWithIp = false;
+
public IoTDBConfig() {
// empty constructor
}
@@ -2515,4 +2520,12 @@ public class IoTDBConfig {
public void setEnableIDTableLogFile(boolean enableIDTableLogFile) {
this.enableIDTableLogFile = enableIDTableLogFile;
}
+
+ public boolean isReplaceHostNameWithIp() {
+ return replaceHostNameWithIp;
+ }
+
+ public void setReplaceHostNameWithIp(boolean replaceHostNameWithIp) {
+ this.replaceHostNameWithIp = replaceHostNameWithIp;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 849fddb..bb062a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -136,7 +136,9 @@ public class IoTDBDescriptor {
"enable_monitor_series_write", Boolean.toString(conf.isEnableStatMonitor()))));
conf.setRpcAddress(properties.getProperty("rpc_address", conf.getRpcAddress()));
- replaceHostnameWithIP();
+ if (conf.isReplaceHostNameWithIp()) {
+ replaceHostnameWithIP();
+ }
conf.setRpcThriftCompressionEnable(
Boolean.parseBoolean(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 0b4770b..918a5a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -539,6 +539,7 @@ public abstract class PhysicalPlan {
DROP_FUNCTION,
SELECT_INTO,
DUMMY,
+ DUMMY_CLUSTER,
SET_SYSTEM_MODE,
UNSET_TEMPLATE,
APPEND_TEMPLATE,
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java
index 42e701f..ff93076 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java
@@ -32,8 +32,12 @@ import java.util.List;
public class DummyPlan extends PhysicalPlan {
- private byte[] workload;
- private boolean needForward;
+ public static final String GROUP_ID_SEPARATOR = "#";
+ public static final String META_GROUP_ID = "meta";
+
+ protected byte[] workload;
+ protected boolean needForward;
+ protected String groupIdentifier = META_GROUP_ID;
public DummyPlan() {
super(OperatorType.EMPTY);
@@ -52,6 +56,10 @@ public class DummyPlan extends PhysicalPlan {
stream.write(workload);
}
stream.write(needForward ? 1 : 0);
+
+ byte[] bytes = groupIdentifier.getBytes();
+ stream.writeInt(bytes.length);
+ stream.write(bytes);
}
@Override
@@ -62,6 +70,10 @@ public class DummyPlan extends PhysicalPlan {
buffer.put(workload);
}
buffer.put(needForward ? (byte) 1 : 0);
+
+ byte[] bytes = groupIdentifier.getBytes();
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
}
@Override
@@ -70,6 +82,11 @@ public class DummyPlan extends PhysicalPlan {
workload = new byte[size];
buffer.get(workload);
needForward = buffer.get() == 1;
+
+ int groupIdSize = buffer.getInt();
+ byte[] bytes = new byte[groupIdSize];
+ buffer.get(bytes);
+ groupIdentifier = new String(bytes);
}
public void setWorkload(byte[] workload) {
@@ -88,6 +105,14 @@ public class DummyPlan extends PhysicalPlan {
return workload;
}
+ public String getGroupIdentifier() {
+ return groupIdentifier;
+ }
+
+ public void setGroupIdentifier(String groupIdentifier) {
+ this.groupIdentifier = groupIdentifier;
+ }
+
@Override
public String toString() {
return "ExprPlan";
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index df93e31..ea8edf7 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -92,6 +92,7 @@ struct AppendEntryRequest {
// true if the request is sent from the leader, and the reiceiver just responds to the sender;
// otherwise, the reiceiver should also notify the leader
8: optional bool isFromLeader
+ 9: optional list<Node> subReceivers
}
@@ -107,6 +108,10 @@ struct AppendEntriesRequest {
// because a data server may play many data groups members, this is used to identify which
// member should process the request or response. Only used in data group communication.
7: optional RaftNode header
+ // true if the request is sent from the leader, and the reiceiver just responds to the sender;
+ // otherwise, the reiceiver should also notify the leader
+ 8: optional bool isFromLeader
+ 9: optional list<Node> subReceivers
}
struct AddNodeResponse {
@@ -325,11 +330,6 @@ service RaftService {
**/
AppendEntryResult appendEntry(1:AppendEntryRequest request)
- /**
- * Like appendEntry, but the receiver should forward the request to nodes in subReceivers.
- **/
- AppendEntryResult appendEntryIndirect(1:AppendEntryRequest request, 2:list<Node> subReceivers)
-
void sendSnapshot(1:SendSnapshotRequest request)
/**