You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/07/16 09:31:17 UTC

[iotdb] 01/02: fix

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch autoai_cluster
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a11a5611d2fb11607df9beb9edcfd1c69f9e0ca0
Author: LebronAl <TX...@gmail.com>
AuthorDate: Thu Jul 15 19:03:39 2021 +0800

    fix
---
 .../iotdb/cluster/client/DataClientProvider.java   |  9 ++-
 .../iotdb/cluster/coordinator/Coordinator.java     | 17 ++----
 .../apache/iotdb/cluster/metadata/CMManager.java   | 71 ++++++++++++++++------
 .../apache/iotdb/cluster/metadata/MetaPuller.java  | 18 ++++--
 .../iotdb/cluster/query/ClusterPlanExecutor.java   | 39 +++++++++---
 .../cluster/query/aggregate/ClusterAggregator.java |  9 ++-
 .../cluster/query/fill/ClusterPreviousFill.java    | 25 +++++---
 .../query/groupby/RemoteGroupByExecutor.java       | 21 +++++--
 .../query/last/ClusterLastQueryExecutor.java       | 26 +++++---
 .../cluster/query/reader/ClusterReaderFactory.java |  8 ++-
 .../iotdb/cluster/query/reader/DataSourceInfo.java | 36 ++++++-----
 .../reader/RemoteSeriesReaderByTimestamp.java      |  2 +
 .../query/reader/RemoteSimpleSeriesReader.java     |  2 +
 .../query/reader/mult/MultDataSourceInfo.java      | 15 +++--
 .../query/reader/mult/RemoteMultSeriesReader.java  | 17 +++---
 .../apache/iotdb/cluster/server/ClientServer.java  |  8 ++-
 .../cluster/server/heartbeat/HeartbeatThread.java  |  7 +++
 .../cluster/server/member/MetaGroupMember.java     | 15 +++--
 .../cluster/client/DataClientProviderTest.java     |  5 +-
 19 files changed, 239 insertions(+), 111 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
index 106705f..0950958 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 
-import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
 
 import java.io.IOException;
@@ -102,10 +101,10 @@ public class DataClientProvider {
    * @param node the node to be connected
    * @param timeout timeout threshold of connection
    */
-  public SyncDataClient getSyncDataClient(Node node, int timeout) throws TException {
+  public SyncDataClient getSyncDataClient(Node node, int timeout) throws IOException {
     SyncDataClient client = (SyncDataClient) getDataSyncClientPool().getClient(node);
     if (client == null) {
-      throw new TException(GET_CLIENT_FAILED_MSG + node);
+      throw new IOException(GET_CLIENT_FAILED_MSG + node);
     }
     client.setTimeout(timeout);
     return client;
@@ -121,10 +120,10 @@ public class DataClientProvider {
    * @param node the node to be connected
    * @param timeout timeout threshold of connection
    */
-  public SyncDataClient getSyncDataClientForRefresh(Node node, int timeout) throws TException {
+  public SyncDataClient getSyncDataClientForRefresh(Node node, int timeout) throws IOException {
     SyncDataClient client = (SyncDataClient) getDataSyncClientPool().getClientForRefresh(node);
     if (client == null) {
-      throw new TException(GET_CLIENT_FAILED_MSG + node);
+      throw new IOException(GET_CLIENT_FAILED_MSG + node);
     }
     client.setTimeout(timeout);
     return client;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 11f99e8..db0fce3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -58,7 +58,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -703,15 +702,11 @@ public class Coordinator {
 
   private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header)
       throws IOException {
-    RaftService.Client client = null;
-    try {
-      client =
-          metaGroupMember
-              .getClientProvider()
-              .getSyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS());
-    } catch (TException e) {
-      throw new IOException(e);
-    }
+    RaftService.Client client =
+        metaGroupMember
+            .getClientProvider()
+            .getSyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS());
+
     return this.metaGroupMember.forwardPlanSync(plan, receiver, header, client);
   }
 
@@ -735,7 +730,7 @@ public class Coordinator {
    * @param node the node to be connected
    * @param timeout timeout threshold of connection
    */
-  public SyncDataClient getSyncDataClient(Node node, int timeout) throws TException {
+  public SyncDataClient getSyncDataClient(Node node, int timeout) throws IOException {
     return metaGroupMember.getClientProvider().getSyncDataClient(node, timeout);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 319159a..05d61a4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -794,8 +794,14 @@ public class CMManager extends MManager {
               metaGroupMember
                   .getClientProvider()
                   .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-            result =
-                syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
+            try {
+              result =
+                  syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
+            } catch (TException e) {
+              // the connection may be broken, close it to avoid it being reused
+              syncDataClient.getInputProtocol().getTransport().close();
+              throw e;
+            }
           }
         }
         if (result != null) {
@@ -978,13 +984,18 @@ public class CMManager extends MManager {
           metaGroupMember
               .getClientProvider()
               .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
-        PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request);
-        ByteBuffer buffer = pullSchemaResp.schemaBytes;
-        int size = buffer.getInt();
-        schemas = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
-          schemas.add(TimeseriesSchema.deserializeFrom(buffer));
+        try {
+          PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request);
+          ByteBuffer buffer = pullSchemaResp.schemaBytes;
+          int size = buffer.getInt();
+          schemas = new ArrayList<>(size);
+          for (int i = 0; i < size; i++) {
+            schemas.add(TimeseriesSchema.deserializeFrom(buffer));
+          }
+        } catch (TException e) {
+          // the connection may be broken, close it to avoid it being reused
+          syncDataClient.getInputProtocol().getTransport().close();
+          throw e;
         }
       }
     }
@@ -1212,8 +1223,13 @@ public class CMManager extends MManager {
           metaGroupMember
               .getClientProvider()
               .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
-        result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
+        try {
+          result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
+        } catch (TException e) {
+          // the connection may be broken, close it to avoid it being reused
+          syncDataClient.getInputProtocol().getTransport().close();
+          throw e;
+        }
       }
     }
 
@@ -1338,8 +1354,13 @@ public class CMManager extends MManager {
           metaGroupMember
               .getClientProvider()
               .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
-        paths = syncDataClient.getAllDevices(header, pathsToQuery);
+        try {
+          paths = syncDataClient.getAllDevices(header, pathsToQuery);
+        } catch (TException e) {
+          // the connection may be broken, close it to avoid it being reused
+          syncDataClient.getInputProtocol().getTransport().close();
+          throw e;
+        }
       }
     }
     return paths;
@@ -1792,9 +1813,15 @@ public class CMManager extends MManager {
                   .getClientProvider()
                   .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
         plan.serialize(dataOutputStream);
-        resultBinary =
-            syncDataClient.getAllMeasurementSchema(
-                group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+        try {
+          resultBinary =
+              syncDataClient.getAllMeasurementSchema(
+                  group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+        } catch (TException e) {
+          // the connection may be broken, close it to avoid it being reused
+          syncDataClient.getInputProtocol().getTransport().close();
+          throw e;
+        }
       }
     }
     return resultBinary;
@@ -1818,9 +1845,15 @@ public class CMManager extends MManager {
                   .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
 
         plan.serialize(dataOutputStream);
-        resultBinary =
-            syncDataClient.getDevices(
-                group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+        try {
+          resultBinary =
+              syncDataClient.getDevices(
+                  group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+        } catch (TException e) {
+          // the connection may be broken, close it to avoid it being reused
+          syncDataClient.getInputProtocol().getTransport().close();
+          throw e;
+        }
       }
     }
     return resultBinary;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index e524772..9991c5a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -231,12 +231,18 @@ public class MetaPuller {
               .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
 
         // only need measurement name
-        PullSchemaResp pullSchemaResp = syncDataClient.pullMeasurementSchema(request);
-        ByteBuffer buffer = pullSchemaResp.schemaBytes;
-        int size = buffer.getInt();
-        schemas = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
-          schemas.add(MeasurementSchema.deserializeFrom(buffer));
+        try {
+          PullSchemaResp pullSchemaResp = syncDataClient.pullMeasurementSchema(request);
+          ByteBuffer buffer = pullSchemaResp.schemaBytes;
+          int size = buffer.getInt();
+          schemas = new ArrayList<>(size);
+          for (int i = 0; i < size; i++) {
+            schemas.add(MeasurementSchema.deserializeFrom(buffer));
+          }
+        } catch (TException e) {
+          // the connection may be broken, close it to avoid it being reused
+          syncDataClient.getInputProtocol().getTransport().close();
+          throw e;
         }
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index ca16cfb..34e9904 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -259,8 +259,13 @@ public class ClusterPlanExecutor extends PlanExecutor {
               metaGroupMember
                   .getClientProvider()
                   .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-            syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
-            count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level);
+            try {
+              count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level);
+            } catch (TException e) {
+              // the connection may be broken, close it to avoid it being reused
+              syncDataClient.getInputProtocol().getTransport().close();
+              throw e;
+            }
           }
         }
         logger.debug(
@@ -363,8 +368,14 @@ public class ClusterPlanExecutor extends PlanExecutor {
               metaGroupMember
                   .getClientProvider()
                   .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-            paths =
-                syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level);
+            try {
+              paths =
+                  syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level);
+            } catch (TException e) {
+              // the connection may be broken, close it to avoid it being reused
+              syncDataClient.getInputProtocol().getTransport().close();
+              throw e;
+            }
           }
         }
         if (paths != null) {
@@ -449,8 +460,14 @@ public class ClusterPlanExecutor extends PlanExecutor {
               metaGroupMember
                   .getClientProvider()
                   .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-            nextChildrenNodes =
-                syncDataClient.getChildNodeInNextLevel(group.getHeader(), path.getFullPath());
+            try {
+              nextChildrenNodes =
+                  syncDataClient.getChildNodeInNextLevel(group.getHeader(), path.getFullPath());
+            } catch (TException e) {
+              // the connection may be broken, close it to avoid it being reused
+              syncDataClient.getInputProtocol().getTransport().close();
+              throw e;
+            }
           }
         }
         if (nextChildrenNodes != null) {
@@ -558,8 +575,14 @@ public class ClusterPlanExecutor extends PlanExecutor {
               metaGroupMember
                   .getClientProvider()
                   .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-            nextChildren =
-                syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath());
+            try {
+              nextChildren =
+                  syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath());
+            } catch (TException e) {
+              // the connection may be broken, close it to avoid it being reused
+              syncDataClient.getInputProtocol().getTransport().close();
+              throw e;
+            }
           }
         }
         if (nextChildren != null) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
index 6c33f50..dc52923 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
@@ -274,8 +274,13 @@ public class ClusterAggregator {
           metaGroupMember
               .getClientProvider()
               .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
-        resultBuffers = syncDataClient.getAggrResult(request);
+        try {
+          resultBuffers = syncDataClient.getAggrResult(request);
+        } catch (TException e) {
+          // the connection may be broken, close it to avoid it being reused
+          syncDataClient.getInputProtocol().getTransport().close();
+          throw e;
+        }
       }
     }
     return resultBuffers;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
index 33274e3..9af7082 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.handlers.caller.PreviousFillHandler;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.PartitionUtils.Intervals;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -41,6 +42,7 @@ import org.apache.iotdb.db.query.executor.fill.PreviousFill;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -240,19 +242,28 @@ public class ClusterPreviousFill extends PreviousFill {
   private ByteBuffer remoteSyncPreviousFill(
       Node node, PreviousFillRequest request, PreviousFillArguments arguments) {
     ByteBuffer byteBuffer = null;
-    try (SyncDataClient syncDataClient =
-        metaGroupMember
-            .getClientProvider()
-            .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
-      byteBuffer = syncDataClient.previousFill(request);
-    } catch (Exception e) {
+    SyncDataClient client = null;
+    try {
+      client =
+          metaGroupMember
+              .getClientProvider()
+              .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+      byteBuffer = client.previousFill(request);
+    } catch (IOException e) {
+      logger.warn("{}: Cannot connect to {} during previous fill", metaGroupMember, node);
+    } catch (TException e) {
       logger.error(
           "{}: Cannot perform previous fill of {} to {}",
           metaGroupMember.getName(),
           arguments.getPath(),
           node,
           e);
+      // the connection may be broken, close it to avoid it being reused
+      client.getInputProtocol().getTransport().close();
+    } finally {
+      if (client != null) {
+        ClientUtils.putBackSyncClient(client);
+      }
     }
     return byteBuffer;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
index 02df747..468ec57 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
@@ -89,8 +89,14 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
                 .getClientProvider()
                 .getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS())) {
 
-          aggrBuffers =
-              syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime);
+          try {
+            aggrBuffers =
+                syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime);
+          } catch (TException e) {
+            // the connection may be broken, close it to avoid it being reused
+            syncDataClient.getInputProtocol().getTransport().close();
+            throw e;
+          }
         }
       }
     } catch (TException e) {
@@ -133,9 +139,14 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
             metaGroupMember
                 .getClientProvider()
                 .getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS())) {
-
-          aggrBuffer =
-              syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime);
+          try {
+            aggrBuffer =
+                syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime);
+          } catch (TException e) {
+            // the connection may be broken, close it to avoid it being reused
+            syncDataClient.getInputProtocol().getTransport().close();
+            throw e;
+          }
         }
       }
     } catch (TException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index d5ec324..a30db6c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -258,19 +259,30 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
     }
 
     private ByteBuffer lastSync(Node node, QueryContext context) throws TException {
-      try (SyncDataClient syncDataClient =
-          metaGroupMember
-              .getClientProvider()
-              .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
-        return syncDataClient.last(
+      SyncDataClient client = null;
+      try {
+        client =
+            metaGroupMember
+                .getClientProvider()
+                .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+        return client.last(
             new LastQueryRequest(
                 PartialPath.toStringList(seriesPaths),
                 dataTypeOrdinals,
                 context.getQueryId(),
                 queryPlan.getDeviceToMeasurements(),
                 group.getHeader(),
-                syncDataClient.getNode()));
+                client.getNode()));
+      } catch (IOException e) {
+        return null;
+      } catch (TException e) {
+        // the connection may be broken, close it to avoid it being reused
+        client.getInputProtocol().getTransport().close();
+        throw e;
+      } finally {
+        if (client != null) {
+          ClientUtils.putBackSyncClient(client);
+        }
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 64a2e3b..e3e0d53 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -896,7 +896,13 @@ public class ClusterReaderFactory {
               .getClientProvider()
               .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
 
-        executorId = syncDataClient.getGroupByExecutor(request);
+        try {
+          executorId = syncDataClient.getGroupByExecutor(request);
+        } catch (TException e) {
+          // the connection may be broken, close it to avoid it being reused
+          syncDataClient.getInputProtocol().getTransport().close();
+          throw e;
+        }
       }
     }
     return executorId;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 8889535..4ba11e4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -153,27 +153,31 @@ public class DataSourceInfo {
   }
 
   private Long applyForReaderIdSync(Node node, boolean byTimestamp, long timestamp)
-      throws TException {
-
-    Long newReaderId;
+      throws TException, IOException {
+    long newReaderId;
     try (SyncDataClient client =
         this.metaGroupMember
             .getClientProvider()
             .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
-      if (byTimestamp) {
-        newReaderId = client.querySingleSeriesByTimestamp(request);
-      } else {
-        Filter newFilter;
-        // add timestamp to as a timeFilter to skip the data which has been read
-        if (request.isSetTimeFilterBytes()) {
-          Filter timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
-          newFilter = new AndFilter(timeFilter, TimeFilter.gt(timestamp));
+      try {
+        if (byTimestamp) {
+          newReaderId = client.querySingleSeriesByTimestamp(request);
         } else {
-          newFilter = TimeFilter.gt(timestamp);
+          Filter newFilter;
+          // add timestamp to as a timeFilter to skip the data which has been read
+          if (request.isSetTimeFilterBytes()) {
+            Filter timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
+            newFilter = new AndFilter(timeFilter, TimeFilter.gt(timestamp));
+          } else {
+            newFilter = TimeFilter.gt(timestamp);
+          }
+          request.setTimeFilterBytes(SerializeUtils.serializeFilter(newFilter));
+          newReaderId = client.querySingleSeries(request);
         }
-        request.setTimeFilterBytes(SerializeUtils.serializeFilter(newFilter));
-        newReaderId = client.querySingleSeries(request);
+      } catch (TException e) {
+        // the connection may be broken, close it to avoid it being reused
+        client.getInputProtocol().getTransport().close();
+        throw e;
       }
       return newReaderId;
     }
@@ -201,7 +205,7 @@ public class DataSourceInfo {
         : metaGroupMember.getClientProvider().getAsyncDataClient(this.curSource, timeout);
   }
 
-  SyncDataClient getCurSyncClient(int timeout) throws TException {
+  SyncDataClient getCurSyncClient(int timeout) throws IOException {
     return isNoClient
         ? null
         : metaGroupMember.getClientProvider().getSyncDataClient(this.curSource, timeout);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index d077f02..e266af8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -108,6 +108,8 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
       return curSyncClient.fetchSingleSeriesByTimestamps(
           sourceInfo.getHeader(), sourceInfo.getReaderId(), timestampList);
     } catch (TException e) {
+      // the connection may be broken, close it to avoid it being reused
+      curSyncClient.getInputProtocol().getTransport().close();
       // try other node
       if (!sourceInfo.switchNode(true, timestamps[0])) {
         return null;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
index 2dcc1b7..f53f2bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
@@ -149,6 +149,8 @@ public class RemoteSimpleSeriesReader implements IPointReader {
       curSyncClient = sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
       return curSyncClient.fetchSingleSeries(sourceInfo.getHeader(), sourceInfo.getReaderId());
     } catch (TException e) {
+      // the connection may be broken, close it to avoid it being reused
+      curSyncClient.getInputProtocol().getTransport().close();
       // try other node
       if (!sourceInfo.switchNode(false, lastTimestamp)) {
         return null;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
index 27c9f59..a4488aa 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
@@ -172,9 +172,8 @@ public class MultDataSourceInfo {
     return result.get();
   }
 
-  private Long applyForReaderIdSync(Node node, long timestamp) throws TException {
-
-    Long newReaderId;
+  private Long applyForReaderIdSync(Node node, long timestamp) throws TException, IOException {
+    long newReaderId;
     try (SyncDataClient client =
         this.metaGroupMember
             .getClientProvider()
@@ -189,7 +188,13 @@ public class MultDataSourceInfo {
         newFilter = TimeFilter.gt(timestamp);
       }
       request.setTimeFilterBytes(SerializeUtils.serializeFilter(newFilter));
-      newReaderId = client.queryMultSeries(request);
+      try {
+        newReaderId = client.queryMultSeries(request);
+      } catch (TException e) {
+        // the connection may be broken, close it to avoid it being reused
+        client.getInputProtocol().getTransport().close();
+        throw e;
+      }
       return newReaderId;
     }
   }
@@ -212,7 +217,7 @@ public class MultDataSourceInfo {
         : metaGroupMember.getClientProvider().getAsyncDataClient(this.curSource, timeout);
   }
 
-  SyncDataClient getCurSyncClient(int timeout) throws TException {
+  SyncDataClient getCurSyncClient(int timeout) throws IOException {
     return isNoClient
         ? null
         : metaGroupMember.getClientProvider().getSyncDataClient(this.curSource, timeout);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index bf20b35..36513d4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@ -181,15 +181,18 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
   }
 
   private Map<String, ByteBuffer> fetchResultSync(List<String> paths) throws IOException {
-
     try (SyncDataClient curSyncClient =
-        sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS()); ) {
-
-      return curSyncClient.fetchMultSeries(sourceInfo.getHeader(), sourceInfo.getReaderId(), paths);
-    } catch (TException e) {
-      logger.error("Failed to fetch result sync, connect to {}", sourceInfo, e);
-      return null;
+        sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS())) {
+      try {
+        return curSyncClient.fetchMultSeries(
+            sourceInfo.getHeader(), sourceInfo.getReaderId(), paths);
+      } catch (TException e) {
+        // the connection may be broken, close it to avoid it being reused
+        curSyncClient.getInputProtocol().getTransport().close();
+        logger.error("Failed to fetch result sync, connect to {}", sourceInfo, e);
+      }
     }
+    return null;
   }
 
   /** select path, which could batch-fetch result */
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index c627373..722aeaf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -317,7 +317,13 @@ public class ClientServer extends TSServiceImpl {
               try (SyncDataClient syncDataClient =
                   coordinator.getSyncDataClient(
                       queriedNode, RaftServer.getReadOperationTimeoutMS())) {
-                syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
+                try {
+                  syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
+                } catch (TException e) {
+                  // the connection may be broken, close it to avoid it being reused
+                  syncDataClient.getInputProtocol().getTransport().close();
+                  throw e;
+                }
               }
             }
           } catch (IOException | TException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index 0459fef..67acc5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.server.handlers.caller.HeartbeatHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 
+import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -222,6 +223,7 @@ public class HeartbeatThread implements Runnable {
                 } catch (TTransportException e) {
                   logger.warn(
                       "{}: Cannot send heart beat to node {} due to network", memberName, node, e);
+                  // the connection may be broken, close it to avoid it being reused
                   client.getInputProtocol().getTransport().close();
                 } catch (Exception e) {
                   logger.warn("{}: Cannot send heart beat to node {}", memberName, node, e);
@@ -401,6 +403,11 @@ public class HeartbeatThread implements Runnable {
                 try {
                   long result = client.startElection(request);
                   handler.onComplete(result);
+                } catch (TException e) {
+                  // the connection may be broken, close it to avoid it being reused
+                  client.getInputProtocol().getTransport().close();
+                  logger.warn("{}: Cannot request a vote from {}", memberName, node, e);
+                  handler.onError(e);
                 } catch (Exception e) {
                   handler.onError(e);
                 } finally {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index cede9a3..1ed302d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -498,23 +498,22 @@ public class MetaGroupMember extends RaftMember {
   }
 
   private void refreshClientOnceSync(Node receiver) {
-    RaftService.Client client;
+    RaftService.Client client = null;
     try {
       client =
           getClientProvider()
               .getSyncDataClientForRefresh(receiver, RaftServer.getWriteOperationTimeoutMS());
-    } catch (TException e) {
-      return;
-    }
-    try {
       RefreshReuqest req = new RefreshReuqest();
       client.refreshConnection(req);
+    } catch (IOException ignored) {
     } catch (TException e) {
-      logger.warn("encounter refreshing client timeout, throw broken connection", e);
+      logger.info("encounter refreshing client timeout, throw broken connection", e);
       // the connection may be broken, close it to avoid it being reused
       client.getInputProtocol().getTransport().close();
     } finally {
-      ClientUtils.putBackSyncClient(client);
+      if (client != null) {
+        ClientUtils.putBackSyncClient(client);
+      }
     }
   }
 
@@ -530,7 +529,7 @@ public class MetaGroupMember extends RaftMember {
     try {
       client.refreshConnection(new RefreshReuqest(), new GenericHandler<>(receiver, null));
     } catch (TException e) {
-      logger.warn("encounter refreshing client timeout, throw broken connection", e);
+      logger.info("encounter refreshing client timeout, throw broken connection", e);
     }
   }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
index d2ee0b7..3987450 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.ClusterNode;
 
-import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 import org.junit.After;
 import org.junit.Assert;
@@ -96,7 +95,7 @@ public class DataClientProviderTest {
       SyncDataClient client = null;
       try {
         client = provider.getSyncDataClient(node, 100);
-      } catch (TException e) {
+      } catch (IOException e) {
         Assert.fail(e.getMessage());
       } finally {
         ClientUtils.putBackSyncClient(client);
@@ -135,7 +134,7 @@ public class DataClientProviderTest {
       SyncDataClient client = null;
       try {
         client = provider.getSyncDataClient(node, 100);
-      } catch (TException e) {
+      } catch (IOException e) {
         Assert.fail(e.getMessage());
       }
       assertNotNull(client);