You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/07/16 09:31:17 UTC
[iotdb] 01/02: fix
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch autoai_cluster
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a11a5611d2fb11607df9beb9edcfd1c69f9e0ca0
Author: LebronAl <TX...@gmail.com>
AuthorDate: Thu Jul 15 19:03:39 2021 +0800
fix
---
.../iotdb/cluster/client/DataClientProvider.java | 9 ++-
.../iotdb/cluster/coordinator/Coordinator.java | 17 ++----
.../apache/iotdb/cluster/metadata/CMManager.java | 71 ++++++++++++++++------
.../apache/iotdb/cluster/metadata/MetaPuller.java | 18 ++++--
.../iotdb/cluster/query/ClusterPlanExecutor.java | 39 +++++++++---
.../cluster/query/aggregate/ClusterAggregator.java | 9 ++-
.../cluster/query/fill/ClusterPreviousFill.java | 25 +++++---
.../query/groupby/RemoteGroupByExecutor.java | 21 +++++--
.../query/last/ClusterLastQueryExecutor.java | 26 +++++---
.../cluster/query/reader/ClusterReaderFactory.java | 8 ++-
.../iotdb/cluster/query/reader/DataSourceInfo.java | 36 ++++++-----
.../reader/RemoteSeriesReaderByTimestamp.java | 2 +
.../query/reader/RemoteSimpleSeriesReader.java | 2 +
.../query/reader/mult/MultDataSourceInfo.java | 15 +++--
.../query/reader/mult/RemoteMultSeriesReader.java | 17 +++---
.../apache/iotdb/cluster/server/ClientServer.java | 8 ++-
.../cluster/server/heartbeat/HeartbeatThread.java | 7 +++
.../cluster/server/member/MetaGroupMember.java | 15 +++--
.../cluster/client/DataClientProviderTest.java | 5 +-
19 files changed, 239 insertions(+), 111 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
index 106705f..0950958 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolFactory;
import java.io.IOException;
@@ -102,10 +101,10 @@ public class DataClientProvider {
* @param node the node to be connected
* @param timeout timeout threshold of connection
*/
- public SyncDataClient getSyncDataClient(Node node, int timeout) throws TException {
+ public SyncDataClient getSyncDataClient(Node node, int timeout) throws IOException {
SyncDataClient client = (SyncDataClient) getDataSyncClientPool().getClient(node);
if (client == null) {
- throw new TException(GET_CLIENT_FAILED_MSG + node);
+ throw new IOException(GET_CLIENT_FAILED_MSG + node);
}
client.setTimeout(timeout);
return client;
@@ -121,10 +120,10 @@ public class DataClientProvider {
* @param node the node to be connected
* @param timeout timeout threshold of connection
*/
- public SyncDataClient getSyncDataClientForRefresh(Node node, int timeout) throws TException {
+ public SyncDataClient getSyncDataClientForRefresh(Node node, int timeout) throws IOException {
SyncDataClient client = (SyncDataClient) getDataSyncClientPool().getClientForRefresh(node);
if (client == null) {
- throw new TException(GET_CLIENT_FAILED_MSG + node);
+ throw new IOException(GET_CLIENT_FAILED_MSG + node);
}
client.setTimeout(timeout);
return client;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 11f99e8..db0fce3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -58,7 +58,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -703,15 +702,11 @@ public class Coordinator {
private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header)
throws IOException {
- RaftService.Client client = null;
- try {
- client =
- metaGroupMember
- .getClientProvider()
- .getSyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS());
- } catch (TException e) {
- throw new IOException(e);
- }
+ RaftService.Client client =
+ metaGroupMember
+ .getClientProvider()
+ .getSyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS());
+
return this.metaGroupMember.forwardPlanSync(plan, receiver, header, client);
}
@@ -735,7 +730,7 @@ public class Coordinator {
* @param node the node to be connected
* @param timeout timeout threshold of connection
*/
- public SyncDataClient getSyncDataClient(Node node, int timeout) throws TException {
+ public SyncDataClient getSyncDataClient(Node node, int timeout) throws IOException {
return metaGroupMember.getClientProvider().getSyncDataClient(node, timeout);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 319159a..05d61a4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -794,8 +794,14 @@ public class CMManager extends MManager {
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
- result =
- syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
+ try {
+ result =
+ syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
if (result != null) {
@@ -978,13 +984,18 @@ public class CMManager extends MManager {
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
- PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request);
- ByteBuffer buffer = pullSchemaResp.schemaBytes;
- int size = buffer.getInt();
- schemas = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- schemas.add(TimeseriesSchema.deserializeFrom(buffer));
+ try {
+ PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request);
+ ByteBuffer buffer = pullSchemaResp.schemaBytes;
+ int size = buffer.getInt();
+ schemas = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ schemas.add(TimeseriesSchema.deserializeFrom(buffer));
+ }
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
}
}
}
@@ -1212,8 +1223,13 @@ public class CMManager extends MManager {
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
- result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
+ try {
+ result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
@@ -1338,8 +1354,13 @@ public class CMManager extends MManager {
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
- paths = syncDataClient.getAllDevices(header, pathsToQuery);
+ try {
+ paths = syncDataClient.getAllDevices(header, pathsToQuery);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
return paths;
@@ -1792,9 +1813,15 @@ public class CMManager extends MManager {
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
plan.serialize(dataOutputStream);
- resultBinary =
- syncDataClient.getAllMeasurementSchema(
- group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+ try {
+ resultBinary =
+ syncDataClient.getAllMeasurementSchema(
+ group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
return resultBinary;
@@ -1818,9 +1845,15 @@ public class CMManager extends MManager {
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
plan.serialize(dataOutputStream);
- resultBinary =
- syncDataClient.getDevices(
- group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+ try {
+ resultBinary =
+ syncDataClient.getDevices(
+ group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
return resultBinary;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index e524772..9991c5a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -231,12 +231,18 @@ public class MetaPuller {
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
// only need measurement name
- PullSchemaResp pullSchemaResp = syncDataClient.pullMeasurementSchema(request);
- ByteBuffer buffer = pullSchemaResp.schemaBytes;
- int size = buffer.getInt();
- schemas = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- schemas.add(MeasurementSchema.deserializeFrom(buffer));
+ try {
+ PullSchemaResp pullSchemaResp = syncDataClient.pullMeasurementSchema(request);
+ ByteBuffer buffer = pullSchemaResp.schemaBytes;
+ int size = buffer.getInt();
+ schemas = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ schemas.add(MeasurementSchema.deserializeFrom(buffer));
+ }
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index ca16cfb..34e9904 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -259,8 +259,13 @@ public class ClusterPlanExecutor extends PlanExecutor {
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
- syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
- count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level);
+ try {
+ count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
logger.debug(
@@ -363,8 +368,14 @@ public class ClusterPlanExecutor extends PlanExecutor {
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
- paths =
- syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level);
+ try {
+ paths =
+ syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
if (paths != null) {
@@ -449,8 +460,14 @@ public class ClusterPlanExecutor extends PlanExecutor {
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
- nextChildrenNodes =
- syncDataClient.getChildNodeInNextLevel(group.getHeader(), path.getFullPath());
+ try {
+ nextChildrenNodes =
+ syncDataClient.getChildNodeInNextLevel(group.getHeader(), path.getFullPath());
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
if (nextChildrenNodes != null) {
@@ -558,8 +575,14 @@ public class ClusterPlanExecutor extends PlanExecutor {
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
- nextChildren =
- syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath());
+ try {
+ nextChildren =
+ syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath());
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
if (nextChildren != null) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
index 6c33f50..dc52923 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
@@ -274,8 +274,13 @@ public class ClusterAggregator {
metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
- resultBuffers = syncDataClient.getAggrResult(request);
+ try {
+ resultBuffers = syncDataClient.getAggrResult(request);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
return resultBuffers;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
index 33274e3..9af7082 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.handlers.caller.PreviousFillHandler;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.PartitionUtils.Intervals;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -41,6 +42,7 @@ import org.apache.iotdb.db.query.executor.fill.PreviousFill;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,19 +242,28 @@ public class ClusterPreviousFill extends PreviousFill {
private ByteBuffer remoteSyncPreviousFill(
Node node, PreviousFillRequest request, PreviousFillArguments arguments) {
ByteBuffer byteBuffer = null;
- try (SyncDataClient syncDataClient =
- metaGroupMember
- .getClientProvider()
- .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
- byteBuffer = syncDataClient.previousFill(request);
- } catch (Exception e) {
+ SyncDataClient client = null;
+ try {
+ client =
+ metaGroupMember
+ .getClientProvider()
+ .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+ byteBuffer = client.previousFill(request);
+ } catch (IOException e) {
+ logger.warn("{}: Cannot connect to {} during previous fill", metaGroupMember, node);
+ } catch (TException e) {
logger.error(
"{}: Cannot perform previous fill of {} to {}",
metaGroupMember.getName(),
arguments.getPath(),
node,
e);
+ // the connection may be broken, close it to avoid it being reused
+ client.getInputProtocol().getTransport().close();
+ } finally {
+ if (client != null) {
+ ClientUtils.putBackSyncClient(client);
+ }
}
return byteBuffer;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
index 02df747..468ec57 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
@@ -89,8 +89,14 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
.getClientProvider()
.getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS())) {
- aggrBuffers =
- syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime);
+ try {
+ aggrBuffers =
+ syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
} catch (TException e) {
@@ -133,9 +139,14 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
metaGroupMember
.getClientProvider()
.getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS())) {
-
- aggrBuffer =
- syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime);
+ try {
+ aggrBuffer =
+ syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
} catch (TException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index d5ec324..a30db6c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -258,19 +259,30 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
}
private ByteBuffer lastSync(Node node, QueryContext context) throws TException {
- try (SyncDataClient syncDataClient =
- metaGroupMember
- .getClientProvider()
- .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
- return syncDataClient.last(
+ SyncDataClient client = null;
+ try {
+ client =
+ metaGroupMember
+ .getClientProvider()
+ .getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
+ return client.last(
new LastQueryRequest(
PartialPath.toStringList(seriesPaths),
dataTypeOrdinals,
context.getQueryId(),
queryPlan.getDeviceToMeasurements(),
group.getHeader(),
- syncDataClient.getNode()));
+ client.getNode()));
+ } catch (IOException e) {
+ return null;
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ client.getInputProtocol().getTransport().close();
+ throw e;
+ } finally {
+ if (client != null) {
+ ClientUtils.putBackSyncClient(client);
+ }
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 64a2e3b..e3e0d53 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -896,7 +896,13 @@ public class ClusterReaderFactory {
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
- executorId = syncDataClient.getGroupByExecutor(request);
+ try {
+ executorId = syncDataClient.getGroupByExecutor(request);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
return executorId;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 8889535..4ba11e4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -153,27 +153,31 @@ public class DataSourceInfo {
}
private Long applyForReaderIdSync(Node node, boolean byTimestamp, long timestamp)
- throws TException {
-
- Long newReaderId;
+ throws TException, IOException {
+ long newReaderId;
try (SyncDataClient client =
this.metaGroupMember
.getClientProvider()
.getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS())) {
-
- if (byTimestamp) {
- newReaderId = client.querySingleSeriesByTimestamp(request);
- } else {
- Filter newFilter;
- // add timestamp to as a timeFilter to skip the data which has been read
- if (request.isSetTimeFilterBytes()) {
- Filter timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
- newFilter = new AndFilter(timeFilter, TimeFilter.gt(timestamp));
+ try {
+ if (byTimestamp) {
+ newReaderId = client.querySingleSeriesByTimestamp(request);
} else {
- newFilter = TimeFilter.gt(timestamp);
+ Filter newFilter;
+ // add timestamp to as a timeFilter to skip the data which has been read
+ if (request.isSetTimeFilterBytes()) {
+ Filter timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
+ newFilter = new AndFilter(timeFilter, TimeFilter.gt(timestamp));
+ } else {
+ newFilter = TimeFilter.gt(timestamp);
+ }
+ request.setTimeFilterBytes(SerializeUtils.serializeFilter(newFilter));
+ newReaderId = client.querySingleSeries(request);
}
- request.setTimeFilterBytes(SerializeUtils.serializeFilter(newFilter));
- newReaderId = client.querySingleSeries(request);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ client.getInputProtocol().getTransport().close();
+ throw e;
}
return newReaderId;
}
@@ -201,7 +205,7 @@ public class DataSourceInfo {
: metaGroupMember.getClientProvider().getAsyncDataClient(this.curSource, timeout);
}
- SyncDataClient getCurSyncClient(int timeout) throws TException {
+ SyncDataClient getCurSyncClient(int timeout) throws IOException {
return isNoClient
? null
: metaGroupMember.getClientProvider().getSyncDataClient(this.curSource, timeout);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index d077f02..e266af8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -108,6 +108,8 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
return curSyncClient.fetchSingleSeriesByTimestamps(
sourceInfo.getHeader(), sourceInfo.getReaderId(), timestampList);
} catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ curSyncClient.getInputProtocol().getTransport().close();
// try other node
if (!sourceInfo.switchNode(true, timestamps[0])) {
return null;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
index 2dcc1b7..f53f2bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
@@ -149,6 +149,8 @@ public class RemoteSimpleSeriesReader implements IPointReader {
curSyncClient = sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
return curSyncClient.fetchSingleSeries(sourceInfo.getHeader(), sourceInfo.getReaderId());
} catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ curSyncClient.getInputProtocol().getTransport().close();
// try other node
if (!sourceInfo.switchNode(false, lastTimestamp)) {
return null;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
index 27c9f59..a4488aa 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
@@ -172,9 +172,8 @@ public class MultDataSourceInfo {
return result.get();
}
- private Long applyForReaderIdSync(Node node, long timestamp) throws TException {
-
- Long newReaderId;
+ private Long applyForReaderIdSync(Node node, long timestamp) throws TException, IOException {
+ long newReaderId;
try (SyncDataClient client =
this.metaGroupMember
.getClientProvider()
@@ -189,7 +188,13 @@ public class MultDataSourceInfo {
newFilter = TimeFilter.gt(timestamp);
}
request.setTimeFilterBytes(SerializeUtils.serializeFilter(newFilter));
- newReaderId = client.queryMultSeries(request);
+ try {
+ newReaderId = client.queryMultSeries(request);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ client.getInputProtocol().getTransport().close();
+ throw e;
+ }
return newReaderId;
}
}
@@ -212,7 +217,7 @@ public class MultDataSourceInfo {
: metaGroupMember.getClientProvider().getAsyncDataClient(this.curSource, timeout);
}
- SyncDataClient getCurSyncClient(int timeout) throws TException {
+ SyncDataClient getCurSyncClient(int timeout) throws IOException {
return isNoClient
? null
: metaGroupMember.getClientProvider().getSyncDataClient(this.curSource, timeout);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index bf20b35..36513d4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@ -181,15 +181,18 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
}
private Map<String, ByteBuffer> fetchResultSync(List<String> paths) throws IOException {
-
try (SyncDataClient curSyncClient =
- sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS()); ) {
-
- return curSyncClient.fetchMultSeries(sourceInfo.getHeader(), sourceInfo.getReaderId(), paths);
- } catch (TException e) {
- logger.error("Failed to fetch result sync, connect to {}", sourceInfo, e);
- return null;
+ sourceInfo.getCurSyncClient(RaftServer.getReadOperationTimeoutMS())) {
+ try {
+ return curSyncClient.fetchMultSeries(
+ sourceInfo.getHeader(), sourceInfo.getReaderId(), paths);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ curSyncClient.getInputProtocol().getTransport().close();
+ logger.error("Failed to fetch result sync, connect to {}", sourceInfo, e);
+ }
}
+ return null;
}
/** select path, which could batch-fetch result */
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index c627373..722aeaf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -317,7 +317,13 @@ public class ClientServer extends TSServiceImpl {
try (SyncDataClient syncDataClient =
coordinator.getSyncDataClient(
queriedNode, RaftServer.getReadOperationTimeoutMS())) {
- syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
+ try {
+ syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
}
}
} catch (IOException | TException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index 0459fef..67acc5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.server.handlers.caller.HeartbeatHandler;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -222,6 +223,7 @@ public class HeartbeatThread implements Runnable {
} catch (TTransportException e) {
logger.warn(
"{}: Cannot send heart beat to node {} due to network", memberName, node, e);
+ // the connection may be broken, close it to avoid it being reused
client.getInputProtocol().getTransport().close();
} catch (Exception e) {
logger.warn("{}: Cannot send heart beat to node {}", memberName, node, e);
@@ -401,6 +403,11 @@ public class HeartbeatThread implements Runnable {
try {
long result = client.startElection(request);
handler.onComplete(result);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ client.getInputProtocol().getTransport().close();
+ logger.warn("{}: Cannot request a vote from {}", memberName, node, e);
+ handler.onError(e);
} catch (Exception e) {
handler.onError(e);
} finally {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index cede9a3..1ed302d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -498,23 +498,22 @@ public class MetaGroupMember extends RaftMember {
}
private void refreshClientOnceSync(Node receiver) {
- RaftService.Client client;
+ RaftService.Client client = null;
try {
client =
getClientProvider()
.getSyncDataClientForRefresh(receiver, RaftServer.getWriteOperationTimeoutMS());
- } catch (TException e) {
- return;
- }
- try {
RefreshReuqest req = new RefreshReuqest();
client.refreshConnection(req);
+ } catch (IOException ignored) {
} catch (TException e) {
- logger.warn("encounter refreshing client timeout, throw broken connection", e);
+ logger.info("encounter refreshing client timeout, throw broken connection", e);
// the connection may be broken, close it to avoid it being reused
client.getInputProtocol().getTransport().close();
} finally {
- ClientUtils.putBackSyncClient(client);
+ if (client != null) {
+ ClientUtils.putBackSyncClient(client);
+ }
}
}
@@ -530,7 +529,7 @@ public class MetaGroupMember extends RaftMember {
try {
client.refreshConnection(new RefreshReuqest(), new GenericHandler<>(receiver, null));
} catch (TException e) {
- logger.warn("encounter refreshing client timeout, throw broken connection", e);
+ logger.info("encounter refreshing client timeout, throw broken connection", e);
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
index d2ee0b7..3987450 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.ClusterNode;
-import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.After;
import org.junit.Assert;
@@ -96,7 +95,7 @@ public class DataClientProviderTest {
SyncDataClient client = null;
try {
client = provider.getSyncDataClient(node, 100);
- } catch (TException e) {
+ } catch (IOException e) {
Assert.fail(e.getMessage());
} finally {
ClientUtils.putBackSyncClient(client);
@@ -135,7 +134,7 @@ public class DataClientProviderTest {
SyncDataClient client = null;
try {
client = provider.getSyncDataClient(node, 100);
- } catch (TException e) {
+ } catch (IOException e) {
Assert.fail(e.getMessage());
}
assertNotNull(client);