You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/29 08:43:59 UTC

[iotdb] branch master updated: [IOTDB-5723] Pipe progress index: fix NumberFormatException when using IoTConsensus in DataRegion (#9965)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f4464c9441b [IOTDB-5723] Pipe progress index: fix NumberFormatException when using IoTConsensus in DataRegion (#9965)
f4464c9441b is described below

commit f4464c9441bebc157768bc8b8399c26514e55c73
Author: Potato <ta...@apache.org>
AuthorDate: Mon May 29 16:43:52 2023 +0800

    [IOTDB-5723] Pipe progress index: fix NumberFormatException when using IoTConsensus in DataRegion (#9965)
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../consensus/common/request/BatchIndexedConsensusRequest.java    | 6 +++---
 .../org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java    | 8 ++++----
 .../apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java   | 4 ++--
 .../thrift-iot-consensus/src/main/thrift/iotconsensus.thrift      | 2 +-
 .../statemachine/IoTConsensusDataRegionStateMachine.java          | 2 +-
 5 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
index 5b148b60ecb..658dc69d24e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
@@ -29,9 +29,9 @@ public class BatchIndexedConsensusRequest implements IConsensusRequest {
   private long startSyncIndex;
   private long endSyncIndex;
   private final List<IndexedConsensusRequest> requests;
-  private final String sourcePeerId;
+  private final int sourcePeerId;
 
-  public BatchIndexedConsensusRequest(String sourcePeerId) {
+  public BatchIndexedConsensusRequest(int sourcePeerId) {
     this.sourcePeerId = sourcePeerId;
     this.requests = new LinkedList<>();
     this.isStartSyncIndexInitialized = false;
@@ -54,7 +54,7 @@ public class BatchIndexedConsensusRequest implements IConsensusRequest {
     return endSyncIndex;
   }
 
-  public String getSourcePeerId() {
+  public int getSourcePeerId() {
     return sourcePeerId;
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 8c8badf4be6..ba9d2709ad2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -98,7 +98,7 @@ public class IoTConsensusServerImpl {
   private final Logger logger = LoggerFactory.getLogger(IoTConsensusServerImpl.class);
   private final Peer thisNode;
   private final IStateMachine stateMachine;
-  private final ConcurrentHashMap<String, SyncLogCacheQueue> cacheQueueMap;
+  private final ConcurrentHashMap<Integer, SyncLogCacheQueue> cacheQueueMap;
   private final Lock stateMachineLock = new ReentrantLock();
   private final Condition stateMachineCondition = stateMachineLock.newCondition();
   private final String storageDir;
@@ -789,7 +789,7 @@ public class IoTConsensusServerImpl {
     }
   }
 
-  public TSStatus syncLog(String sourcePeerId, IConsensusRequest request) {
+  public TSStatus syncLog(int sourcePeerId, IConsensusRequest request) {
     return cacheQueueMap
         .computeIfAbsent(sourcePeerId, SyncLogCacheQueue::new)
         .cacheAndInsertLatestNode((DeserializedBatchIndexedConsensusRequest) request);
@@ -805,13 +805,13 @@ public class IoTConsensusServerImpl {
    * deserialization of PlanNode to be concurrent
    */
   private class SyncLogCacheQueue {
-    private final String sourcePeerId;
+    private final int sourcePeerId;
     private final Lock queueLock = new ReentrantLock();
     private final Condition queueSortCondition = queueLock.newCondition();
     private final PriorityQueue<DeserializedBatchIndexedConsensusRequest> requestCache;
     private long nextSyncIndex = -1;
 
-    public SyncLogCacheQueue(String sourcePeerId) {
+    public SyncLogCacheQueue(int sourcePeerId) {
       this.sourcePeerId = sourcePeerId;
       this.requestCache = new PriorityQueue<>();
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 8023ef88c27..a52ece6b3d8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -58,7 +58,7 @@ public class LogDispatcher {
   private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
   private final IoTConsensusServerImpl impl;
   private final List<LogDispatcherThread> threads;
-  private final String selfPeerId;
+  private final int selfPeerId;
   private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
   private ExecutorService executorService;
 
@@ -73,7 +73,7 @@ public class LogDispatcher {
       IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
       IoTConsensusServerMetrics ioTConsensusServerMetrics) {
     this.impl = impl;
-    this.selfPeerId = impl.getThisNode().getEndpoint().toString();
+    this.selfPeerId = impl.getThisNode().getNodeId();
     this.clientManager = clientManager;
     this.threads =
         impl.getConfiguration().stream()
diff --git a/iotdb-protocol/thrift-iot-consensus/src/main/thrift/iotconsensus.thrift b/iotdb-protocol/thrift-iot-consensus/src/main/thrift/iotconsensus.thrift
index 70796f26d77..5810ff97a96 100644
--- a/iotdb-protocol/thrift-iot-consensus/src/main/thrift/iotconsensus.thrift
+++ b/iotdb-protocol/thrift-iot-consensus/src/main/thrift/iotconsensus.thrift
@@ -30,7 +30,7 @@ struct TLogEntry {
 
 struct TSyncLogEntriesReq {
   # source peer where the TSyncLogEntriesReq is generated
-  1: required string peerId
+  1: required i32 peerId
   2: required common.TConsensusGroupId consensusGroupId
   3: required list<TLogEntry> logEntries
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
index 8fa8cb7c12b..04cb676d633 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/IoTConsensusDataRegionStateMachine.java
@@ -90,7 +90,7 @@ public class IoTConsensusDataRegionStateMachine extends DataRegionStateMachine {
         if (planNode instanceof ComparableConsensusRequest) {
           final IoTProgressIndex ioTProgressIndex = new IoTProgressIndex();
           ioTProgressIndex.addSearchIndex(
-              Integer.parseInt(batchRequest.getSourcePeerId()), indexedRequest.getSearchIndex());
+              batchRequest.getSourcePeerId(), indexedRequest.getSearchIndex());
           ((ComparableConsensusRequest) planNode).setProgressIndex(ioTProgressIndex);
         }
         deserializedRequest.add(planNode);