You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/11 13:05:43 UTC

[iotdb] branch BrokenPipe created (now ee335be10a)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch BrokenPipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at ee335be10a Fix cached client error after node restarting

This branch includes the following new commits:

     new ee335be10a Fix cached client error after node restarting

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Fix cached client error after node restarting

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch BrokenPipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ee335be10a926c67987aa09135eb8add01703c04
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed May 11 21:04:55 2022 +0800

    Fix cached client error after node restarting
---
 .../client/sync/SyncConfigNodeIServiceClient.java   | 19 ++++++++++++-------
 .../sync/SyncDataNodeDataBlockServiceClient.java    | 19 ++++++++++++-------
 .../sync/SyncDataNodeInternalServiceClient.java     | 21 +++++++++++++--------
 .../iotdb/commons/client/sync/SyncThriftClient.java |  6 ++++++
 .../sync/SyncThriftClientWithErrorHandler.java      | 21 ++++++++++++++++++---
 5 files changed, 61 insertions(+), 25 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
index 8091839e43..ea1f1ec14d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
@@ -40,13 +40,13 @@ import java.net.SocketException;
 public class SyncConfigNodeIServiceClient extends ConfigIService.Client
     implements SyncThriftClient, AutoCloseable {
 
-  private final TEndPoint endpoint;
+  private final TEndPoint endPoint;
   private final ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
 
   public SyncConfigNodeIServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
-      TEndPoint endpoint,
+      TEndPoint endPoint,
       ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager)
       throws TTransportException {
     super(
@@ -54,17 +54,17 @@ public class SyncConfigNodeIServiceClient extends ConfigIService.Client
             RpcTransportFactory.INSTANCE.getTransport(
                 new TSocket(
                     TConfigurationConst.defaultTConfiguration,
-                    endpoint.getIp(),
-                    endpoint.getPort(),
+                    endPoint.getIp(),
+                    endPoint.getPort(),
                     connectionTimeout))));
-    this.endpoint = endpoint;
+    this.endPoint = endPoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
   public void close() {
     if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
+      clientManager.returnClient(endPoint, this);
     }
   }
 
@@ -77,13 +77,18 @@ public class SyncConfigNodeIServiceClient extends ConfigIService.Client
     getInputProtocol().getTransport().close();
   }
 
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endPoint);
+  }
+
   public int getTimeout() throws SocketException {
     return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   @Override
   public String toString() {
-    return String.format("SyncConfigNodeIServiceClient{%s}", endpoint);
+    return String.format("SyncConfigNodeIServiceClient{%s}", endPoint);
   }
 
   public static class Factory extends BaseClientFactory<TEndPoint, SyncConfigNodeIServiceClient> {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
index 79b4da8547..32f573c48a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeDataBlockServiceClient.java
@@ -40,13 +40,13 @@ import java.net.SocketException;
 public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
     implements SyncThriftClient, AutoCloseable {
 
-  private final TEndPoint endpoint;
+  private final TEndPoint endPoint;
   private final ClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> clientManager;
 
   public SyncDataNodeDataBlockServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
-      TEndPoint endpoint,
+      TEndPoint endPoint,
       ClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> clientManager)
       throws TTransportException {
     super(
@@ -54,17 +54,17 @@ public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
             RpcTransportFactory.INSTANCE.getTransport(
                 new TSocket(
                     TConfigurationConst.defaultTConfiguration,
-                    endpoint.getIp(),
-                    endpoint.getPort(),
+                    endPoint.getIp(),
+                    endPoint.getPort(),
                     connectionTimeout))));
-    this.endpoint = endpoint;
+    this.endPoint = endPoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
   public void close() {
     if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
+      clientManager.returnClient(endPoint, this);
     }
   }
 
@@ -77,13 +77,18 @@ public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
     getInputProtocol().getTransport().close();
   }
 
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endPoint);
+  }
+
   public int getTimeout() throws SocketException {
     return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   @Override
   public String toString() {
-    return String.format("SyncDataNodeDataBlockServiceClient{%s}", endpoint);
+    return String.format("SyncDataNodeDataBlockServiceClient{%s}", endPoint);
   }
 
   public static class Factory
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index 23d2c56f43..e3407516f4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -41,13 +41,13 @@ import java.net.SocketException;
 public class SyncDataNodeInternalServiceClient extends InternalService.Client
     implements SyncThriftClient, AutoCloseable {
 
-  private final TEndPoint endpoint;
+  private final TEndPoint endPoint;
   private final ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
 
   public SyncDataNodeInternalServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
-      TEndPoint endpoint,
+      TEndPoint endPoint,
       ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager)
       throws TTransportException {
     super(
@@ -55,17 +55,17 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client
             RpcTransportFactory.INSTANCE.getTransport(
                 new TSocket(
                     TConfigurationConst.defaultTConfiguration,
-                    endpoint.getIp(),
-                    endpoint.getPort(),
+                    endPoint.getIp(),
+                    endPoint.getPort(),
                     connectionTimeout))));
-    this.endpoint = endpoint;
+    this.endPoint = endPoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
   @TestOnly
   public TEndPoint getTEndpoint() {
-    return endpoint;
+    return endPoint;
   }
 
   @TestOnly
@@ -75,7 +75,7 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client
 
   public void close() {
     if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
+      clientManager.returnClient(endPoint, this);
     }
   }
 
@@ -88,13 +88,18 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client
     getInputProtocol().getTransport().close();
   }
 
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endPoint);
+  }
+
   public int getTimeout() throws SocketException {
     return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   @Override
   public String toString() {
-    return String.format("SyncDataNodeInternalServiceClient{%s}", endpoint);
+    return String.format("SyncDataNodeInternalServiceClient{%s}", endPoint);
   }
 
   public static class Factory
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
index 38eaa252b2..ff3862f879 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
@@ -22,4 +22,10 @@ public interface SyncThriftClient {
 
   /** close the connection */
   void invalidate();
+
+  /**
+   * Clears the specified pool, removing all pooled instances corresponding to current instance's
+   * endPoint.
+   */
+  void invalidateAll();
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
index 2792a2ba67..09a2a00946 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.client.sync;
 import net.sf.cglib.proxy.Enhancer;
 import net.sf.cglib.proxy.MethodInterceptor;
 import net.sf.cglib.proxy.MethodProxy;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.SocketException;
 
 public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
 
@@ -52,9 +54,22 @@ public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
       return methodProxy.invokeSuper(o, objects);
     } catch (InvocationTargetException e) {
       if (e.getTargetException() instanceof TException) {
-        LOGGER.error(
-            "Error in calling method {}, err: {}", method.getName(), e.getTargetException());
-        ((SyncThriftClient) o).invalidate();
+        Throwable rootCause = ExceptionUtils.getRootCause(e);
+        // if the exception is SocketException and its error message is Broken pipe, it means that
+        // the remote node may restart and all the connection we cached before should be cleared.
+        if (rootCause instanceof SocketException
+            && rootCause.getMessage().contains("Broken pipe")) {
+          LOGGER.error(
+              "Broken pipe error happened in calling method {}, we need to clear all previous cached connection, err: {}",
+              method.getName(),
+              e.getTargetException());
+          ((SyncThriftClient) o).invalidate();
+          ((SyncThriftClient) o).invalidateAll();
+        } else {
+          LOGGER.error(
+              "Error in calling method {}, err: {}", method.getName(), e.getTargetException());
+          ((SyncThriftClient) o).invalidate();
+        }
       }
       throw new TException("Error in calling method " + method.getName(), e.getTargetException());
     } catch (Exception e) {