You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/29 08:43:59 UTC
[iotdb] branch master updated: [IOTDB-5723] Pipe progress index: fix NumberFormatException when using IoTConsensus in DataRegion (#9965)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f4464c9441b [IOTDB-5723] Pipe progress index: fix NumberFormatException when using IoTConsensus in DataRegion (#9965)
f4464c9441b is described below
commit f4464c9441bebc157768bc8b8399c26514e55c73
Author: Potato <ta...@apache.org>
AuthorDate: Mon May 29 16:43:52 2023 +0800
[IOTDB-5723] Pipe progress index: fix NumberFormatException when using IoTConsensus in DataRegion (#9965)
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
.../consensus/common/request/BatchIndexedConsensusRequest.java | 6 +++---
.../org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java | 8 ++++----
.../apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java | 4 ++--
.../thrift-iot-consensus/src/main/thrift/iotconsensus.thrift | 2 +-
.../statemachine/IoTConsensusDataRegionStateMachine.java | 2 +-
5 files changed, 11 insertions(+), 11 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
index 5b148b60ecb..658dc69d24e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
@@ -29,9 +29,9 @@ public class BatchIndexedConsensusRequest implements IConsensusRequest {
private long startSyncIndex;
private long endSyncIndex;
private final List<IndexedConsensusRequest> requests;
- private final String sourcePeerId;
+ private final int sourcePeerId;
- public BatchIndexedConsensusRequest(String sourcePeerId) {
+ public BatchIndexedConsensusRequest(int sourcePeerId) {
this.sourcePeerId = sourcePeerId;
this.requests = new LinkedList<>();
this.isStartSyncIndexInitialized = false;
@@ -54,7 +54,7 @@ public class BatchIndexedConsensusRequest implements IConsensusRequest {
return endSyncIndex;
}
- public String getSourcePeerId() {
+ public int getSourcePeerId() {
return sourcePeerId;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 8c8badf4be6..ba9d2709ad2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -98,7 +98,7 @@ public class IoTConsensusServerImpl {
private final Logger logger = LoggerFactory.getLogger(IoTConsensusServerImpl.class);
private final Peer thisNode;
private final IStateMachine stateMachine;
- private final ConcurrentHashMap<String, SyncLogCacheQueue> cacheQueueMap;
+ private final ConcurrentHashMap<Integer, SyncLogCacheQueue> cacheQueueMap;
private final Lock stateMachineLock = new ReentrantLock();
private final Condition stateMachineCondition = stateMachineLock.newCondition();
private final String storageDir;
@@ -789,7 +789,7 @@ public class IoTConsensusServerImpl {
}
}
- public TSStatus syncLog(String sourcePeerId, IConsensusRequest request) {
+ public TSStatus syncLog(int sourcePeerId, IConsensusRequest request) {
return cacheQueueMap
.computeIfAbsent(sourcePeerId, SyncLogCacheQueue::new)
.cacheAndInsertLatestNode((DeserializedBatchIndexedConsensusRequest) request);
@@ -805,13 +805,13 @@ public class IoTConsensusServerImpl {
* deserialization of PlanNode to be concurrent
*/
private class SyncLogCacheQueue {
- private final String sourcePeerId;
+ private final int sourcePeerId;
private final Lock queueLock = new ReentrantLock();
private final Condition queueSortCondition = queueLock.newCondition();
private final PriorityQueue<DeserializedBatchIndexedConsensusRequest> requestCache;
private long nextSyncIndex = -1;
- public SyncLogCacheQueue(String sourcePeerId) {
+ public SyncLogCacheQueue(int sourcePeerId) {
this.sourcePeerId = sourcePeerId;
this.requestCache = new PriorityQueue<>();
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 8023ef88c27..a52ece6b3d8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -58,7 +58,7 @@ public class LogDispatcher {
private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
private final IoTConsensusServerImpl impl;
private final List<LogDispatcherThread> threads;
- private final String selfPeerId;
+ private final int selfPeerId;
private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
private ExecutorService executorService;
@@ -73,7 +73,7 @@ public class LogDispatcher {
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
IoTConsensusServerMetrics ioTConsensusServerMetrics) {
this.impl = impl;
- this.selfPeerId = impl.getThisNode().getEndpoint().toString();
+ this.selfPeerId = impl.getThisNode().getNodeId();
this.clientManager = clientManager;
this.threads =
impl.getConfiguration().stream()
diff --git a/iotdb-protocol/thrift-iot-consensus/src/main/thrift/iotconsensus.thrift b/iotdb-protocol/thrift-iot-consensus/src/main/thrift/iotconsensus.thrift
index 70796f26d77..5810ff97a96 100644
--- a/iotdb-protocol/thrift-iot-consensus/src/main/thrift/iotconsensus.thrift
+++ b/iotdb-protocol/thrift-iot-consensus/src/main/thrift/iotconsensus.thrift
@@ -30,7 +30,7 @@ struct TLogEntry {
struct TSyncLogEntriesReq {
# source peer where the TSyncLogEntriesReq is generated
- 1: required string peerId
+ 1: required i32 peerId
2: required common.TConsensusGroupId consensusGroupId
3: required list<TLogEntry> logEntries
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
index 8fa8cb7c12b..04cb676d633 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
@@ -90,7 +90,7 @@ public class IoTConsensusDataRegionStateMachine extends DataRegionStateMachine {
if (planNode instanceof ComparableConsensusRequest) {
final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex();
ioTProgressIndex.addSearchIndex(
- Integer.parseInt(batchRequest.getSourcePeerId()), indexedRequest.getSearchIndex());
+ batchRequest.getSourcePeerId(), indexedRequest.getSearchIndex());
((ComparableConsensusRequest) planNode).setProgressIndex(ioTProgressIndex);
}
deserializedRequest.add(planNode);