You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/11 13:05:43 UTC
[iotdb] branch BrokenPipe created (now ee335be10a)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a change to branch BrokenPipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git
at ee335be10a Fix cached client error after node restarting
This branch includes the following new commits:
new ee335be10a Fix cached client error after node restarting
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[iotdb] 01/01: Fix cached client error after node restarting
Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch BrokenPipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ee335be10a926c67987aa09135eb8add01703c04
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed May 11 21:04:55 2022 +0800
Fix cached client error after node restarting
---
.../client/sync/SyncConfigNodeIServiceClient.java | 19 ++++++++++++-------
.../sync/SyncDataNodeDataBlockServiceClient.java | 19 ++++++++++++-------
.../sync/SyncDataNodeInternalServiceClient.java | 21 +++++++++++++--------
.../iotdb/commons/client/sync/SyncThriftClient.java | 6 ++++++
.../sync/SyncThriftClientWithErrorHandler.java | 21 ++++++++++++++++++---
5 files changed, 61 insertions(+), 25 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
index 8091839e43..ea1f1ec14d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
@@ -40,13 +40,13 @@ import java.net.SocketException;
public class SyncConfigNodeIServiceClient extends ConfigIService.Client
implements SyncThriftClient, AutoCloseable {
- private final TEndPoint endpoint;
+ private final TEndPoint endPoint;
private final ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
public SyncConfigNodeIServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
- TEndPoint endpoint,
+ TEndPoint endPoint,
ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager)
throws TTransportException {
super(
@@ -54,17 +54,17 @@ public class SyncConfigNodeIServiceClient extends ConfigIService.Client
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
- endpoint.getIp(),
- endpoint.getPort(),
+ endPoint.getIp(),
+ endPoint.getPort(),
connectionTimeout))));
- this.endpoint = endpoint;
+ this.endPoint = endPoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
}
public void close() {
if (clientManager != null) {
- clientManager.returnClient(endpoint, this);
+ clientManager.returnClient(endPoint, this);
}
}
@@ -77,13 +77,18 @@ public class SyncConfigNodeIServiceClient extends ConfigIService.Client
getInputProtocol().getTransport().close();
}
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endPoint);
+ }
+
public int getTimeout() throws SocketException {
return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
}
@Override
public String toString() {
- return String.format("SyncConfigNodeIServiceClient{%s}", endpoint);
+ return String.format("SyncConfigNodeIServiceClient{%s}", endPoint);
}
public static class Factory extends BaseClientFactory<TEndPoint, SyncConfigNodeIServiceClient> {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
index 79b4da8547..32f573c48a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
@@ -40,13 +40,13 @@ import java.net.SocketException;
public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
implements SyncThriftClient, AutoCloseable {
- private final TEndPoint endpoint;
+ private final TEndPoint endPoint;
private final ClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> clientManager;
public SyncDataNodeDataBlockServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
- TEndPoint endpoint,
+ TEndPoint endPoint,
ClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> clientManager)
throws TTransportException {
super(
@@ -54,17 +54,17 @@ public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
- endpoint.getIp(),
- endpoint.getPort(),
+ endPoint.getIp(),
+ endPoint.getPort(),
connectionTimeout))));
- this.endpoint = endpoint;
+ this.endPoint = endPoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
}
public void close() {
if (clientManager != null) {
- clientManager.returnClient(endpoint, this);
+ clientManager.returnClient(endPoint, this);
}
}
@@ -77,13 +77,18 @@ public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
getInputProtocol().getTransport().close();
}
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endPoint);
+ }
+
public int getTimeout() throws SocketException {
return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
}
@Override
public String toString() {
- return String.format("SyncDataNodeDataBlockServiceClient{%s}", endpoint);
+ return String.format("SyncDataNodeDataBlockServiceClient{%s}", endPoint);
}
public static class Factory
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index 23d2c56f43..e3407516f4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -41,13 +41,13 @@ import java.net.SocketException;
public class SyncDataNodeInternalServiceClient extends InternalService.Client
implements SyncThriftClient, AutoCloseable {
- private final TEndPoint endpoint;
+ private final TEndPoint endPoint;
private final ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
public SyncDataNodeInternalServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
- TEndPoint endpoint,
+ TEndPoint endPoint,
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager)
throws TTransportException {
super(
@@ -55,17 +55,17 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
- endpoint.getIp(),
- endpoint.getPort(),
+ endPoint.getIp(),
+ endPoint.getPort(),
connectionTimeout))));
- this.endpoint = endpoint;
+ this.endPoint = endPoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
}
@TestOnly
public TEndPoint getTEndpoint() {
- return endpoint;
+ return endPoint;
}
@TestOnly
@@ -75,7 +75,7 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client
public void close() {
if (clientManager != null) {
- clientManager.returnClient(endpoint, this);
+ clientManager.returnClient(endPoint, this);
}
}
@@ -88,13 +88,18 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client
getInputProtocol().getTransport().close();
}
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endPoint);
+ }
+
public int getTimeout() throws SocketException {
return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
}
@Override
public String toString() {
- return String.format("SyncDataNodeInternalServiceClient{%s}", endpoint);
+ return String.format("SyncDataNodeInternalServiceClient{%s}", endPoint);
}
public static class Factory
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
index 38eaa252b2..ff3862f879 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
@@ -22,4 +22,10 @@ public interface SyncThriftClient {
/** close the connection */
void invalidate();
+
+ /**
+ * Clears the specified pool, removing all pooled instances corresponding to current instance's
+ * endPoint.
+ */
+ void invalidateAll();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
index 2792a2ba67..09a2a00946 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.client.sync;
import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.net.SocketException;
public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
@@ -52,9 +54,22 @@ public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
return methodProxy.invokeSuper(o, objects);
} catch (InvocationTargetException e) {
if (e.getTargetException() instanceof TException) {
- LOGGER.error(
- "Error in calling method {}, err: {}", method.getName(), e.getTargetException());
- ((SyncThriftClient) o).invalidate();
+ Throwable rootCause = ExceptionUtils.getRootCause(e);
+ // if the exception is SocketException and its error message is Broken pipe, it means that
+ // the remote node may restart and all the connection we cached before should be cleared.
+ if (rootCause instanceof SocketException
+ && rootCause.getMessage().contains("Broken pipe")) {
+ LOGGER.error(
+ "Broken pipe error happened in calling method {}, we need to clear all previous cached connection, err: {}",
+ method.getName(),
+ e.getTargetException());
+ ((SyncThriftClient) o).invalidate();
+ ((SyncThriftClient) o).invalidateAll();
+ } else {
+ LOGGER.error(
+ "Error in calling method {}, err: {}", method.getName(), e.getTargetException());
+ ((SyncThriftClient) o).invalidate();
+ }
}
throw new TException("Error in calling method " + method.getName(), e.getTargetException());
} catch (Exception e) {