You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/12 09:03:22 UTC

[iotdb] branch master updated: Fix some issues in MPP framework and add client invalidate-all policy for internal RPC (#5872)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a514bfdf2c Fix some issues in MPP framework and add client invalidate-all policy for internal RPC (#5872)
a514bfdf2c is described below

commit a514bfdf2c7e8c4cc103910b6f34639ec4e370ca
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Thu May 12 17:03:15 2022 +0800

    Fix some issues in MPP framework and add client invalidate-all policy for internal RPC (#5872)
---
 .../client/sync/SyncConfigNodeIServiceClient.java  | 19 +++++---
 .../sync/SyncDataNodeDataBlockServiceClient.java   | 19 +++++---
 .../sync/SyncDataNodeInternalServiceClient.java    | 21 +++++----
 .../commons/client/sync/SyncThriftClient.java      |  6 +++
 .../sync/SyncThriftClientWithErrorHandler.java     | 53 +++++++++++++++++++---
 .../db/mpp/plan/execution/QueryExecution.java      |  4 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |  4 +-
 .../service/thrift/impl/InternalServiceImpl.java   |  1 +
 8 files changed, 96 insertions(+), 31 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
index 8091839e43..ea1f1ec14d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
@@ -40,13 +40,13 @@ import java.net.SocketException;
 public class SyncConfigNodeIServiceClient extends ConfigIService.Client
     implements SyncThriftClient, AutoCloseable {
 
-  private final TEndPoint endpoint;
+  private final TEndPoint endPoint;
   private final ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
 
   public SyncConfigNodeIServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
-      TEndPoint endpoint,
+      TEndPoint endPoint,
       ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager)
       throws TTransportException {
     super(
@@ -54,17 +54,17 @@ public class SyncConfigNodeIServiceClient extends ConfigIService.Client
             RpcTransportFactory.INSTANCE.getTransport(
                 new TSocket(
                     TConfigurationConst.defaultTConfiguration,
-                    endpoint.getIp(),
-                    endpoint.getPort(),
+                    endPoint.getIp(),
+                    endPoint.getPort(),
                     connectionTimeout))));
-    this.endpoint = endpoint;
+    this.endPoint = endPoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
   public void close() {
     if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
+      clientManager.returnClient(endPoint, this);
     }
   }
 
@@ -77,13 +77,18 @@ public class SyncConfigNodeIServiceClient extends ConfigIService.Client
     getInputProtocol().getTransport().close();
   }
 
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endPoint);
+  }
+
   public int getTimeout() throws SocketException {
     return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   @Override
   public String toString() {
-    return String.format("SyncConfigNodeIServiceClient{%s}", endpoint);
+    return String.format("SyncConfigNodeIServiceClient{%s}", endPoint);
   }
 
   public static class Factory extends BaseClientFactory<TEndPoint, SyncConfigNodeIServiceClient> {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
index 79b4da8547..32f573c48a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
@@ -40,13 +40,13 @@ import java.net.SocketException;
 public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
     implements SyncThriftClient, AutoCloseable {
 
-  private final TEndPoint endpoint;
+  private final TEndPoint endPoint;
   private final ClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> clientManager;
 
   public SyncDataNodeDataBlockServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
-      TEndPoint endpoint,
+      TEndPoint endPoint,
       ClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> clientManager)
       throws TTransportException {
     super(
@@ -54,17 +54,17 @@ public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
             RpcTransportFactory.INSTANCE.getTransport(
                 new TSocket(
                     TConfigurationConst.defaultTConfiguration,
-                    endpoint.getIp(),
-                    endpoint.getPort(),
+                    endPoint.getIp(),
+                    endPoint.getPort(),
                     connectionTimeout))));
-    this.endpoint = endpoint;
+    this.endPoint = endPoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
   public void close() {
     if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
+      clientManager.returnClient(endPoint, this);
     }
   }
 
@@ -77,13 +77,18 @@ public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
     getInputProtocol().getTransport().close();
   }
 
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endPoint);
+  }
+
   public int getTimeout() throws SocketException {
     return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   @Override
   public String toString() {
-    return String.format("SyncDataNodeDataBlockServiceClient{%s}", endpoint);
+    return String.format("SyncDataNodeDataBlockServiceClient{%s}", endPoint);
   }
 
   public static class Factory
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index 23d2c56f43..e3407516f4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -41,13 +41,13 @@ import java.net.SocketException;
 public class SyncDataNodeInternalServiceClient extends InternalService.Client
     implements SyncThriftClient, AutoCloseable {
 
-  private final TEndPoint endpoint;
+  private final TEndPoint endPoint;
   private final ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
 
   public SyncDataNodeInternalServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
-      TEndPoint endpoint,
+      TEndPoint endPoint,
       ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager)
       throws TTransportException {
     super(
@@ -55,17 +55,17 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client
             RpcTransportFactory.INSTANCE.getTransport(
                 new TSocket(
                     TConfigurationConst.defaultTConfiguration,
-                    endpoint.getIp(),
-                    endpoint.getPort(),
+                    endPoint.getIp(),
+                    endPoint.getPort(),
                     connectionTimeout))));
-    this.endpoint = endpoint;
+    this.endPoint = endPoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
   @TestOnly
   public TEndPoint getTEndpoint() {
-    return endpoint;
+    return endPoint;
   }
 
   @TestOnly
@@ -75,7 +75,7 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client
 
   public void close() {
     if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
+      clientManager.returnClient(endPoint, this);
     }
   }
 
@@ -88,13 +88,18 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client
     getInputProtocol().getTransport().close();
   }
 
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endPoint);
+  }
+
   public int getTimeout() throws SocketException {
     return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   @Override
   public String toString() {
-    return String.format("SyncDataNodeInternalServiceClient{%s}", endpoint);
+    return String.format("SyncDataNodeInternalServiceClient{%s}", endPoint);
   }
 
   public static class Factory
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
index 38eaa252b2..ff3862f879 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
@@ -22,4 +22,10 @@ public interface SyncThriftClient {
 
   /** close the connection */
   void invalidate();
+
+  /**
+   * Clears the specified pool, removing all pooled instances corresponding to current instance's
+   * endPoint.
+   */
+  void invalidateAll();
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
index 2792a2ba67..0601ebe251 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
@@ -21,13 +21,16 @@ package org.apache.iotdb.commons.client.sync;
 import net.sf.cglib.proxy.Enhancer;
 import net.sf.cglib.proxy.MethodInterceptor;
 import net.sf.cglib.proxy.MethodProxy;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.SocketException;
 
 public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
 
@@ -50,15 +53,51 @@ public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
       throws Throwable {
     try {
       return methodProxy.invokeSuper(o, objects);
-    } catch (InvocationTargetException e) {
-      if (e.getTargetException() instanceof TException) {
-        LOGGER.error(
-            "Error in calling method {}, err: {}", method.getName(), e.getTargetException());
+    } catch (Throwable t) {
+      Throwable origin = t;
+      if (t instanceof InvocationTargetException) {
+        origin = ((InvocationTargetException) t).getTargetException();
+      }
+      Throwable cur = origin;
+      if (cur instanceof TException) {
+        int level = 0;
+        while (cur != null) {
+          LOGGER.error(
+              "level-{} Exception class {}, message {}",
+              level,
+              cur.getClass().getName(),
+              cur.getMessage());
+          cur = cur.getCause();
+          level++;
+        }
         ((SyncThriftClient) o).invalidate();
       }
-      throw new TException("Error in calling method " + method.getName(), e.getTargetException());
-    } catch (Exception e) {
-      throw new TException("Error in calling method " + method.getName(), e);
+
+      Throwable rootCause = ExceptionUtils.getRootCause(origin);
+      if (rootCause != null) {
+        // if the exception is SocketException and its error message is Broken pipe, it means that
+        // the remote node may restart and all the connection we cached before should be cleared.
+        LOGGER.error(
+            "root cause message {}, LocalizedMessage {}, ",
+            rootCause.getMessage(),
+            rootCause.getLocalizedMessage(),
+            rootCause);
+        if (isConnectionBroken(rootCause)) {
+          LOGGER.error(
+              "Broken pipe error happened in calling method {}, we need to clear all previous cached connection, err: {}",
+              method.getName(),
+              t);
+          ((SyncThriftClient) o).invalidate();
+          ((SyncThriftClient) o).invalidateAll();
+        }
+      }
+      throw new TException("Error in calling method " + method.getName(), t);
     }
   }
+
+  private boolean isConnectionBroken(Throwable cause) {
+    return (cause instanceof SocketException && cause.getMessage().contains("Broken pipe"))
+        || (cause instanceof TTransportException
+            && cause.getMessage().contains("Socket is closed by peer"));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 0e5da23ad1..257559527c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -236,7 +236,9 @@ public class QueryExecution implements IQueryExecution {
     // There are only two scenarios where the ResultHandle should be closed:
     //   1. The client fetch all the result and the ResultHandle is finished.
     //   2. The client's connection is closed that all owned QueryExecution should be cleaned up
-    if (resultHandle != null && resultHandle.isFinished()) {
+    // If the QueryExecution's state is abnormal, we should also abort the resultHandle without
+    // waiting it to be finished.
+    if (resultHandle != null) {
       resultHandle.abort();
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
index 969ff3cca1..ba53e4fe2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,8 +70,9 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
               TSendFragmentInstanceReq req =
                   new TSendFragmentInstanceReq(
                       new TFragmentInstance(buffer), groupId, instance.getType().toString());
+              LOGGER.info("send FragmentInstance[{}] to {}", instance.getId(), endPoint);
               resp = client.sendFragmentInstance(req);
-            } catch (IOException e) {
+            } catch (IOException | TException e) {
               LOGGER.error("can't connect to node {}", endPoint, e);
               throw e;
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index e2a1fff0c3..54b9f82278 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -87,6 +87,7 @@ public class InternalServiceImpl implements InternalService.Iface {
 
   @Override
   public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
+    LOGGER.info("receive FragmentInstance to group[{}]", req.getConsensusGroupId());
     QueryType type = QueryType.valueOf(req.queryType);
     ConsensusGroupId groupId =
         ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());