You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/07/12 08:17:07 UTC

[iotdb] branch fix_clientpool_timeout updated (f4e6435 -> b311974)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch fix_clientpool_timeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


 discard f4e6435  init
     new b311974  add dataClientRefresher

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f4e6435)
            \
             N -- N -- N   refs/heads/fix_clientpool_timeout (b311974)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iotdb/cluster/client/DataClientProvider.java   |  9 +++
 .../cluster/client/async/AsyncClientPool.java      | 14 +++++
 .../iotdb/cluster/client/sync/SyncClientPool.java  | 21 +++----
 .../iotdb/cluster/server/DataClusterServer.java    | 12 ++--
 .../iotdb/cluster/server/MetaClusterServer.java    |  8 ++-
 .../cluster/server/member/MetaGroupMember.java     | 72 +++++++++++++++-------
 .../cluster/server/service/BaseAsyncService.java   |  4 +-
 .../cluster/server/service/BaseSyncService.java    |  4 +-
 .../apache/iotdb/session/IoTDBSessionSimpleIT.java |  3 +-
 thrift-cluster/src/main/thrift/cluster.thrift      |  8 +--
 10 files changed, 103 insertions(+), 52 deletions(-)

[iotdb] 01/01: add dataClientRefresher

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch fix_clientpool_timeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b31197427a038034b1c187267b021dab1806bcb1
Author: LebronAl <TX...@gmail.com>
AuthorDate: Mon Jul 12 16:16:16 2021 +0800

    add dataClientRefresher
---
 .../iotdb/cluster/client/DataClientProvider.java   | 18 ++++++
 .../cluster/client/async/AsyncClientPool.java      | 14 +++++
 .../iotdb/cluster/client/sync/SyncClientPool.java  | 13 ++++
 .../iotdb/cluster/server/DataClusterServer.java    |  9 +++
 .../iotdb/cluster/server/MetaClusterServer.java    |  9 +++
 .../cluster/server/member/MetaGroupMember.java     | 72 ++++++++++++++++++++++
 .../cluster/server/service/BaseAsyncService.java   |  4 ++
 .../cluster/server/service/BaseSyncService.java    |  4 ++
 .../org/apache/iotdb/db/metadata/MManager.java     |  3 +
 thrift-cluster/src/main/thrift/cluster.thrift      |  6 +-
 10 files changed, 151 insertions(+), 1 deletion(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
index 8b954ec..e93cfc7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
@@ -74,6 +74,15 @@ public class DataClientProvider {
     return client;
   }
 
+  public AsyncDataClient getAsyncDataClientForRefresh(Node node, int timeout) throws IOException {
+    AsyncDataClient client = (AsyncDataClient) getDataAsyncClientPool().getClientForRefresh(node);
+    if (client == null) {
+      throw new IOException("can not get client for node=" + node);
+    }
+    client.setTimeout(timeout);
+    return client;
+  }
+
   /**
    * IMPORTANT!!! After calling this function, the caller should make sure to call {@link
    * org.apache.iotdb.cluster.utils.ClientUtils#putBackSyncClient(Client)} to put the client back
@@ -92,4 +101,13 @@ public class DataClientProvider {
     client.setTimeout(timeout);
     return client;
   }
+
+  public SyncDataClient getSyncDataClientForRefresh(Node node, int timeout) throws TException {
+    SyncDataClient client = (SyncDataClient) getDataSyncClientPool().getClientForRefresh(node);
+    if (client == null) {
+      throw new TException("can not get client for node=" + node);
+    }
+    client.setTimeout(timeout);
+    return client;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index 4ba3fe9..87e0ec7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -52,6 +52,20 @@ public class AsyncClientPool {
         ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
   }
 
+  public AsyncClient getClientForRefresh(Node node) {
+    ClusterNode clusterNode = new ClusterNode(node);
+    // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
+    Deque<AsyncClient> clientStack =
+        clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    synchronized (clientStack) {
+      if (clientStack.isEmpty()) {
+        return null;
+      } else {
+        return clientStack.poll();
+      }
+    }
+  }
+
   /**
    * See getClient(Node node, boolean activatedOnly)
    *
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index 2c279c0..8d36b98 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -51,6 +51,19 @@ public class SyncClientPool {
         ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
   }
 
+  public Client getClientForRefresh(Node node) {
+    ClusterNode clusterNode = new ClusterNode(node);
+    // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
+    Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    synchronized (clientStack) {
+      if (clientStack.isEmpty()) {
+        return null;
+      } else {
+        return clientStack.poll();
+      }
+    }
+  }
+
   /**
    * See getClient(Node node, boolean activatedOnly)
    *
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index e4c81f8..e8c2581 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RefreshReuqest;
 import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
@@ -308,6 +309,11 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
+  public void RefreshConnection(RefreshReuqest request, AsyncMethodCallback<Void> resultHandler) {
+    resultHandler.onComplete(null);
+  }
+
+  @Override
   public void requestCommitIndex(
       Node header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
     DataAsyncService service = getDataAsyncService(header, resultHandler, "Request commit index");
@@ -921,6 +927,9 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
+  public void RefreshConnection(RefreshReuqest request) {}
+
+  @Override
   public RequestCommitIndexResponse requestCommitIndex(Node header) throws TException {
     return getDataSyncService(header).requestCommitIndex(header);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index abb8020..98f1fa1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RefreshReuqest;
 import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
@@ -226,6 +227,11 @@ public class MetaClusterServer extends RaftServer
   }
 
   @Override
+  public void RefreshConnection(RefreshReuqest request, AsyncMethodCallback<Void> resultHandler) {
+    resultHandler.onComplete(null);
+  }
+
+  @Override
   public void requestCommitIndex(
       Node header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
     asyncService.requestCommitIndex(header, resultHandler);
@@ -334,6 +340,9 @@ public class MetaClusterServer extends RaftServer
   }
 
   @Override
+  public void RefreshConnection(RefreshReuqest request) {}
+
+  @Override
   public RequestCommitIndexResponse requestCommitIndex(Node header) throws TException {
     return syncService.requestCommitIndex(header);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 58884e1..f4e0741 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -56,6 +56,8 @@ import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+import org.apache.iotdb.cluster.rpc.thrift.RefreshReuqest;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
 import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
@@ -163,6 +165,12 @@ public class MetaGroupMember extends RaftMember {
    */
   private static final int REPORT_INTERVAL_SEC = 10;
 
+  /**
+   * every "REFRESH_CLIENT_SEC" seconds, a dataClientRefresher thread will try to refresh one thrift
+   * connection for each nodes other than itself.
+   */
+  private static final int REFRESH_CLIENT_SEC = 1;
+
   /** how many times is a data record replicated, also the number of nodes in a data group */
   private static final int REPLICATION_NUM =
       ClusterDescriptor.getInstance().getConfig().getReplicationNum();
@@ -211,6 +219,8 @@ public class MetaGroupMember extends RaftMember {
 
   private DataClientProvider dataClientProvider;
 
+  private ScheduledExecutorService dataClientRefresher;
+
   /**
    * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread" will print the status
    * of all raft members in this node
@@ -331,6 +341,8 @@ public class MetaGroupMember extends RaftMember {
         Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "NodeReportThread"));
     hardLinkCleanerThread =
         Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "HardLinkCleaner"));
+    dataClientRefresher =
+        Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "DataClientRefresher"));
   }
 
   /**
@@ -349,6 +361,15 @@ public class MetaGroupMember extends RaftMember {
     if (clientServer != null) {
       clientServer.stop();
     }
+    if (dataClientRefresher != null) {
+      dataClientRefresher.shutdownNow();
+      try {
+        dataClientRefresher.awaitTermination(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        logger.error("Unexpected interruption when waiting for reportThread to end", e);
+      }
+    }
     if (reportThread != null) {
       reportThread.shutdownNow();
       try {
@@ -460,6 +481,57 @@ public class MetaGroupMember extends RaftMember {
         CLEAN_HARDLINK_INTERVAL_SEC,
         CLEAN_HARDLINK_INTERVAL_SEC,
         TimeUnit.SECONDS);
+    dataClientRefresher.scheduleAtFixedRate(
+        this::RefreshClientOnce, REFRESH_CLIENT_SEC, REFRESH_CLIENT_SEC, TimeUnit.SECONDS);
+  }
+
+  private void RefreshClientOnce() {
+    for (Node receiver : allNodes) {
+      if (!receiver.equals(thisNode)) {
+        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+          RefreshClientOnceAsync(receiver);
+        } else {
+          RefreshClientOnceSync(receiver);
+        }
+      }
+    }
+  }
+
+  private void RefreshClientOnceSync(Node receiver) {
+    RaftService.Client client;
+    try {
+      client =
+          getClientProvider()
+              .getSyncDataClientForRefresh(receiver, RaftServer.getWriteOperationTimeoutMS());
+    } catch (TException e) {
+      return;
+    }
+    try {
+      RefreshReuqest req = new RefreshReuqest();
+      client.RefreshConnection(req);
+    } catch (TException e) {
+      logger.warn("encounter refreshing client timeout, throw broken connection", e);
+      // the connection may be broken, close it to avoid it being reused
+      client.getInputProtocol().getTransport().close();
+    } finally {
+      ClientUtils.putBackSyncClient(client);
+    }
+  }
+
+  private void RefreshClientOnceAsync(Node receiver) {
+    RaftService.AsyncClient client;
+    try {
+      client =
+          getClientProvider()
+              .getAsyncDataClientForRefresh(receiver, RaftServer.getWriteOperationTimeoutMS());
+    } catch (IOException e) {
+      return;
+    }
+    try {
+      client.RefreshConnection(new RefreshReuqest(), new GenericHandler<>(receiver, null));
+    } catch (TException e) {
+      logger.warn("encounter refreshing client timeout, throw broken connection", e);
+    }
   }
 
   private void generateNodeReport() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 8673078..1a9527a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
+import org.apache.iotdb.cluster.rpc.thrift.RefreshReuqest;
 import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -145,6 +146,9 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
   }
 
   @Override
+  public void RefreshConnection(RefreshReuqest request, AsyncMethodCallback<Void> resultHandler) {}
+
+  @Override
   public void executeNonQueryPlan(
       ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) {
     if (member.getCharacter() != NodeCharacter.LEADER) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index ce200ab..a6f0c4a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.rpc.thrift.RefreshReuqest;
 import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -152,6 +153,9 @@ public abstract class BaseSyncService implements RaftService.Iface {
   }
 
   @Override
+  public void RefreshConnection(RefreshReuqest request) {}
+
+  @Override
   public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
     if (member.getCharacter() != NodeCharacter.LEADER) {
       // forward the plan to the leader
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 072ecdd..6eda19a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -984,6 +984,9 @@ public class MManager {
       throws MetadataException {
     MNode deviceMNode = getDeviceNode(device);
     MeasurementMNode measurementMNode = (MeasurementMNode) deviceMNode.getChild(measurement);
+    if (measurementMNode == null) {
+      return getSeriesSchema(device.concatNode(measurement));
+    }
     return measurementMNode.getSchema();
   }
 
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index f23130e..dfbe655 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -261,6 +261,8 @@ struct GetAllPathsResult {
   2: optional list<string> aliasList
 }
 
+struct RefreshReuqest {}
+
 
 service RaftService {
   /**
@@ -336,7 +338,9 @@ service RaftService {
   * When a follower finds that it already has a file in a snapshot locally, it calls this
   * interface to notify the leader to remove the associated hardlink.
   **/
-  void removeHardLink(1: string hardLinkPath)
+  void removeHardLink(1:string hardLinkPath)
+
+  void RefreshConnection(1:RefreshReuqest request)
 }