You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/12 09:03:22 UTC
[iotdb] branch master updated: Fix some issues in MPP framework and add client invalidate-all policy for internal RPC (#5872)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a514bfdf2c Fix some issues in MPP framework and add client invalidate-all policy for internal RPC (#5872)
a514bfdf2c is described below
commit a514bfdf2c7e8c4cc103910b6f34639ec4e370ca
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Thu May 12 17:03:15 2022 +0800
Fix some issues in MPP framework and add client invalidate-all policy for internal RPC (#5872)
---
.../client/sync/SyncConfigNodeIServiceClient.java | 19 +++++---
.../sync/SyncDataNodeDataBlockServiceClient.java | 19 +++++---
.../sync/SyncDataNodeInternalServiceClient.java | 21 +++++----
.../commons/client/sync/SyncThriftClient.java | 6 +++
.../sync/SyncThriftClientWithErrorHandler.java | 53 +++++++++++++++++++---
.../db/mpp/plan/execution/QueryExecution.java | 4 +-
.../scheduler/SimpleFragInstanceDispatcher.java | 4 +-
.../service/thrift/impl/InternalServiceImpl.java | 1 +
8 files changed, 96 insertions(+), 31 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
index 8091839e43..ea1f1ec14d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
@@ -40,13 +40,13 @@ import java.net.SocketException;
public class SyncConfigNodeIServiceClient extends ConfigIService.Client
implements SyncThriftClient, AutoCloseable {
- private final TEndPoint endpoint;
+ private final TEndPoint endPoint;
private final ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
public SyncConfigNodeIServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
- TEndPoint endpoint,
+ TEndPoint endPoint,
ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager)
throws TTransportException {
super(
@@ -54,17 +54,17 @@ public class SyncConfigNodeIServiceClient extends ConfigIService.Client
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
- endpoint.getIp(),
- endpoint.getPort(),
+ endPoint.getIp(),
+ endPoint.getPort(),
connectionTimeout))));
- this.endpoint = endpoint;
+ this.endPoint = endPoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
}
public void close() {
if (clientManager != null) {
- clientManager.returnClient(endpoint, this);
+ clientManager.returnClient(endPoint, this);
}
}
@@ -77,13 +77,18 @@ public class SyncConfigNodeIServiceClient extends ConfigIService.Client
getInputProtocol().getTransport().close();
}
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endPoint);
+ }
+
public int getTimeout() throws SocketException {
return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
}
@Override
public String toString() {
- return String.format("SyncConfigNodeIServiceClient{%s}", endpoint);
+ return String.format("SyncConfigNodeIServiceClient{%s}", endPoint);
}
public static class Factory extends BaseClientFactory<TEndPoint, SyncConfigNodeIServiceClient> {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
index 79b4da8547..32f573c48a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
@@ -40,13 +40,13 @@ import java.net.SocketException;
public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
implements SyncThriftClient, AutoCloseable {
- private final TEndPoint endpoint;
+ private final TEndPoint endPoint;
private final ClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> clientManager;
public SyncDataNodeDataBlockServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
- TEndPoint endpoint,
+ TEndPoint endPoint,
ClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> clientManager)
throws TTransportException {
super(
@@ -54,17 +54,17 @@ public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
- endpoint.getIp(),
- endpoint.getPort(),
+ endPoint.getIp(),
+ endPoint.getPort(),
connectionTimeout))));
- this.endpoint = endpoint;
+ this.endPoint = endPoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
}
public void close() {
if (clientManager != null) {
- clientManager.returnClient(endpoint, this);
+ clientManager.returnClient(endPoint, this);
}
}
@@ -77,13 +77,18 @@ public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
getInputProtocol().getTransport().close();
}
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endPoint);
+ }
+
public int getTimeout() throws SocketException {
return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
}
@Override
public String toString() {
- return String.format("SyncDataNodeDataBlockServiceClient{%s}", endpoint);
+ return String.format("SyncDataNodeDataBlockServiceClient{%s}", endPoint);
}
public static class Factory
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index 23d2c56f43..e3407516f4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -41,13 +41,13 @@ import java.net.SocketException;
public class SyncDataNodeInternalServiceClient extends InternalService.Client
implements SyncThriftClient, AutoCloseable {
- private final TEndPoint endpoint;
+ private final TEndPoint endPoint;
private final ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
public SyncDataNodeInternalServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
- TEndPoint endpoint,
+ TEndPoint endPoint,
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager)
throws TTransportException {
super(
@@ -55,17 +55,17 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
- endpoint.getIp(),
- endpoint.getPort(),
+ endPoint.getIp(),
+ endPoint.getPort(),
connectionTimeout))));
- this.endpoint = endpoint;
+ this.endPoint = endPoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
}
@TestOnly
public TEndPoint getTEndpoint() {
- return endpoint;
+ return endPoint;
}
@TestOnly
@@ -75,7 +75,7 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client
public void close() {
if (clientManager != null) {
- clientManager.returnClient(endpoint, this);
+ clientManager.returnClient(endPoint, this);
}
}
@@ -88,13 +88,18 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client
getInputProtocol().getTransport().close();
}
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endPoint);
+ }
+
public int getTimeout() throws SocketException {
return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
}
@Override
public String toString() {
- return String.format("SyncDataNodeInternalServiceClient{%s}", endpoint);
+ return String.format("SyncDataNodeInternalServiceClient{%s}", endPoint);
}
public static class Factory
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
index 38eaa252b2..ff3862f879 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
@@ -22,4 +22,10 @@ public interface SyncThriftClient {
/** close the connection */
void invalidate();
+
+ /**
+ * Clears the specified pool, removing all pooled instances corresponding to current instance's
+ * endPoint.
+ */
+ void invalidateAll();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
index 2792a2ba67..0601ebe251 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
@@ -21,13 +21,16 @@ package org.apache.iotdb.commons.client.sync;
import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.net.SocketException;
public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
@@ -50,15 +53,51 @@ public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
throws Throwable {
try {
return methodProxy.invokeSuper(o, objects);
- } catch (InvocationTargetException e) {
- if (e.getTargetException() instanceof TException) {
- LOGGER.error(
- "Error in calling method {}, err: {}", method.getName(), e.getTargetException());
+ } catch (Throwable t) {
+ Throwable origin = t;
+ if (t instanceof InvocationTargetException) {
+ origin = ((InvocationTargetException) t).getTargetException();
+ }
+ Throwable cur = origin;
+ if (cur instanceof TException) {
+ int level = 0;
+ while (cur != null) {
+ LOGGER.error(
+ "level-{} Exception class {}, message {}",
+ level,
+ cur.getClass().getName(),
+ cur.getMessage());
+ cur = cur.getCause();
+ level++;
+ }
((SyncThriftClient) o).invalidate();
}
- throw new TException("Error in calling method " + method.getName(), e.getTargetException());
- } catch (Exception e) {
- throw new TException("Error in calling method " + method.getName(), e);
+
+ Throwable rootCause = ExceptionUtils.getRootCause(origin);
+ if (rootCause != null) {
+ // if the exception is SocketException and its error message is Broken pipe, it means that
+ // the remote node may restart and all the connection we cached before should be cleared.
+ LOGGER.error(
+ "root cause message {}, LocalizedMessage {}, ",
+ rootCause.getMessage(),
+ rootCause.getLocalizedMessage(),
+ rootCause);
+ if (isConnectionBroken(rootCause)) {
+ LOGGER.error(
+ "Broken pipe error happened in calling method {}, we need to clear all previous cached connection, err: {}",
+ method.getName(),
+ t);
+ ((SyncThriftClient) o).invalidate();
+ ((SyncThriftClient) o).invalidateAll();
+ }
+ }
+ throw new TException("Error in calling method " + method.getName(), t);
}
}
+
+ private boolean isConnectionBroken(Throwable cause) {
+ return (cause instanceof SocketException && cause.getMessage().contains("Broken pipe"))
+ || (cause instanceof TTransportException
+ && cause.getMessage().contains("Socket is closed by peer"));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 0e5da23ad1..257559527c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -236,7 +236,9 @@ public class QueryExecution implements IQueryExecution {
// There are only two scenarios where the ResultHandle should be closed:
// 1. The client fetch all the result and the ResultHandle is finished.
// 2. The client's connection is closed that all owned QueryExecution should be cleaned up
- if (resultHandle != null && resultHandle.isFinished()) {
+ // If the QueryExecution's state is abnormal, we should also abort the resultHandle without
+ // waiting it to be finished.
+ if (resultHandle != null) {
resultHandle.abort();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
index 969ff3cca1..ba53e4fe2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,8 +70,9 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
TSendFragmentInstanceReq req =
new TSendFragmentInstanceReq(
new TFragmentInstance(buffer), groupId, instance.getType().toString());
+ LOGGER.info("send FragmentInstance[{}] to {}", instance.getId(), endPoint);
resp = client.sendFragmentInstance(req);
- } catch (IOException e) {
+ } catch (IOException | TException e) {
LOGGER.error("can't connect to node {}", endPoint, e);
throw e;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index e2a1fff0c3..54b9f82278 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -87,6 +87,7 @@ public class InternalServiceImpl implements InternalService.Iface {
@Override
public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
+ LOGGER.info("receive FragmentInstance to group[{}]", req.getConsensusGroupId());
QueryType type = QueryType.valueOf(req.queryType);
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());