You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/12 05:13:02 UTC

[iotdb] branch rel/0.13 updated: [IOTDB-2754] SessionPool auto redirect IoTDB instance (#5267) (#5313)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 5a30d37871 [IOTDB-2754] SessionPool auto redirect IoTDB instance (#5267) (#5313)
5a30d37871 is described below

commit 5a30d37871635c285173c82dbb27d069d5e8764e
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue Apr 12 13:12:56 2022 +0800

    [IOTDB-2754] SessionPool auto redirect IoTDB instance (#5267) (#5313)
---
 docs/UserGuide/API/Programming-Java-Native-API.md  |   1 +
 .../UserGuide/API/Programming-Java-Native-API.md   |   3 +-
 .../java/org/apache/iotdb/SessionPoolExample.java  |  42 +++--
 .../java/org/apache/iotdb/session/Session.java     |  67 +++++---
 .../org/apache/iotdb/session/pool/SessionPool.java | 178 +++++++++++++++++++--
 5 files changed, 241 insertions(+), 50 deletions(-)

diff --git a/docs/UserGuide/API/Programming-Java-Native-API.md b/docs/UserGuide/API/Programming-Java-Native-API.md
index 90b4c1e4a0..260ef3c76a 100644
--- a/docs/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/UserGuide/API/Programming-Java-Native-API.md
@@ -514,6 +514,7 @@ If you can not get a session connection in 60 seconds, there is a warning log bu
 If a session has finished an operation, it will be put back to the pool automatically.
 If a session connection is broken, the session will be removed automatically and the pool will try 
 to create a new session and redo the operation.
+You can also specify an url list of multiple reachable nodes when creating a SessionPool, just as you would when creating a Session. To ensure high availability of clients in distributed cluster.
 
 For query operations:
 
diff --git a/docs/zh/UserGuide/API/Programming-Java-Native-API.md b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
index 5edd930d1f..1f7e378504 100644
--- a/docs/zh/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
@@ -498,7 +498,8 @@ void testInsertTablets(Map<String, Tablet> tablets)
 如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
 
 当一个连接被用完后,他会自动返回池中等待下次被使用;
-当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作。
+当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作;
+你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。
 
 对于查询操作:
 
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index 230849d25d..23a1895c22 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -32,12 +32,12 @@ import java.util.concurrent.Executors;
 
 public class SessionPoolExample {
 
-  private static SessionPool pool;
+  private static SessionPool sessionPool;
   private static ExecutorService service;
 
-  public static void main(String[] args)
-      throws StatementExecutionException, IoTDBConnectionException, InterruptedException {
-    pool =
+  /** Build a custom SessionPool for this example */
+  private static void constructCustomSessionPool() {
+    sessionPool =
         new SessionPool.Builder()
             .host("127.0.0.1")
             .port(6667)
@@ -45,13 +45,33 @@ public class SessionPoolExample {
             .password("root")
             .maxSize(3)
             .build();
-    service = Executors.newFixedThreadPool(10);
+  }
 
+  /** Build a redirect-able SessionPool for this example */
+  private static void constructRedirectSessionPool() {
+    List<String> nodeUrls = new ArrayList<>();
+    nodeUrls.add("127.0.0.1:6667");
+    nodeUrls.add("127.0.0.1:6668");
+    sessionPool =
+        new SessionPool.Builder()
+            .nodeUrls(nodeUrls)
+            .user("root")
+            .password("root")
+            .maxSize(3)
+            .build();
+  }
+
+  public static void main(String[] args)
+      throws StatementExecutionException, IoTDBConnectionException, InterruptedException {
+    // Choose the SessionPool you going to use
+    constructRedirectSessionPool();
+
+    service = Executors.newFixedThreadPool(10);
     insertRecord();
     queryByRowRecord();
     Thread.sleep(1000);
     queryByIterator();
-    pool.close();
+    sessionPool.close();
     service.shutdown();
   }
 
@@ -72,7 +92,7 @@ public class SessionPoolExample {
       values.add(1L);
       values.add(2L);
       values.add(3L);
-      pool.insertRecord(deviceId, time, measurements, types, values);
+      sessionPool.insertRecord(deviceId, time, measurements, types, values);
     }
   }
 
@@ -82,7 +102,7 @@ public class SessionPoolExample {
           () -> {
             SessionDataSetWrapper wrapper = null;
             try {
-              wrapper = pool.executeQueryStatement("select * from root.sg1.d1");
+              wrapper = sessionPool.executeQueryStatement("select * from root.sg1.d1");
               System.out.println(wrapper.getColumnNames());
               System.out.println(wrapper.getColumnTypes());
               while (wrapper.hasNext()) {
@@ -92,7 +112,7 @@ public class SessionPoolExample {
               e.printStackTrace();
             } finally {
               // remember to close data set finally!
-              pool.closeResultSet(wrapper);
+              sessionPool.closeResultSet(wrapper);
             }
           });
     }
@@ -104,7 +124,7 @@ public class SessionPoolExample {
           () -> {
             SessionDataSetWrapper wrapper = null;
             try {
-              wrapper = pool.executeQueryStatement("select * from root.sg1.d1");
+              wrapper = sessionPool.executeQueryStatement("select * from root.sg1.d1");
               // get DataIterator like JDBC
               DataIterator dataIterator = wrapper.iterator();
               System.out.println(wrapper.getColumnNames());
@@ -120,7 +140,7 @@ public class SessionPoolExample {
               e.printStackTrace();
             } finally {
               // remember to close data set finally!
-              pool.closeResultSet(wrapper);
+              sessionPool.closeResultSet(wrapper);
             }
           });
     }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9a31211403..b7bf9d9185 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1022,8 +1022,11 @@ public class Session {
           genTSInsertStringRecordsReq(deviceIds, times, measurementsList, valuesList, false);
       try {
         defaultSessionConnection.insertRecords(request);
-      } catch (RedirectException ignored) {
-        // ignore
+      } catch (RedirectException e) {
+        Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+        for (Map.Entry<String, EndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+        }
       }
     }
   }
@@ -1056,8 +1059,11 @@ public class Session {
           genTSInsertStringRecordsReq(deviceIds, times, measurementsList, valuesList, true);
       try {
         defaultSessionConnection.insertRecords(request);
-      } catch (RedirectException ignored) {
-        // ignore
+      } catch (RedirectException e) {
+        Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+        for (Map.Entry<String, EndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+        }
       }
     }
   }
@@ -1139,8 +1145,11 @@ public class Session {
           genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, valuesList, false);
       try {
         defaultSessionConnection.insertRecords(request);
-      } catch (RedirectException ignored) {
-        // ignore
+      } catch (RedirectException e) {
+        Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+        for (Map.Entry<String, EndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+        }
       }
     }
   }
@@ -1174,8 +1183,11 @@ public class Session {
           genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, valuesList, true);
       try {
         defaultSessionConnection.insertRecords(request);
-      } catch (RedirectException ignored) {
-        // ignore
+      } catch (RedirectException e) {
+        Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+        for (Map.Entry<String, EndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+        }
       }
     }
   }
@@ -1657,8 +1669,11 @@ public class Session {
           genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, false);
       try {
         defaultSessionConnection.insertTablets(request);
-      } catch (RedirectException ignored) {
-        // ignored
+      } catch (RedirectException e) {
+        Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+        for (Map.Entry<String, EndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+        }
       }
     }
   }
@@ -1692,8 +1707,11 @@ public class Session {
           genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, true);
       try {
         defaultSessionConnection.insertTablets(request);
-      } catch (RedirectException ignored) {
-        // ignored
+      } catch (RedirectException e) {
+        Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+        for (Map.Entry<String, EndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+        }
       }
     }
   }
@@ -2470,7 +2488,7 @@ public class Session {
     private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE;
     private Version version = Config.DEFAULT_VERSION;
 
-    List<String> nodeUrls = null;
+    private List<String> nodeUrls = null;
 
     public Builder host(String host) {
       this.host = host;
@@ -2535,16 +2553,19 @@ public class Session {
       }
 
       if (nodeUrls != null) {
-        return new Session(
-            nodeUrls,
-            username,
-            password,
-            fetchSize,
-            zoneId,
-            thriftDefaultBufferSize,
-            thriftMaxFrameSize,
-            enableCacheLeader,
-            version);
+        Session newSession =
+            new Session(
+                nodeUrls,
+                username,
+                password,
+                fetchSize,
+                zoneId,
+                thriftDefaultBufferSize,
+                thriftMaxFrameSize,
+                enableCacheLeader,
+                version);
+        newSession.setEnableQueryRedirection(true);
+        return newSession;
       }
 
       return new Session(
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 900f4185e4..37249b9971 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -93,6 +93,9 @@ public class SessionPool {
   // whether the queue is closed.
   private boolean closed;
 
+  // Redirect-able SessionPool
+  private final List<String> nodeUrls;
+
   public SessionPool(String host, int port, String user, String password, int maxSize) {
     this(
         host,
@@ -108,6 +111,20 @@ public class SessionPool {
         Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
+  public SessionPool(List<String> nodeUrls, String user, String password, int maxSize) {
+    this(
+        nodeUrls,
+        user,
+        password,
+        maxSize,
+        Config.DEFAULT_FETCH_SIZE,
+        60_000,
+        false,
+        null,
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+  }
+
   public SessionPool(
       String host, int port, String user, String password, int maxSize, boolean enableCompression) {
     this(
@@ -124,6 +141,21 @@ public class SessionPool {
         Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
+  public SessionPool(
+      List<String> nodeUrls, String user, String password, int maxSize, boolean enableCompression) {
+    this(
+        nodeUrls,
+        user,
+        password,
+        maxSize,
+        Config.DEFAULT_FETCH_SIZE,
+        60_000,
+        enableCompression,
+        null,
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+  }
+
   public SessionPool(
       String host,
       int port,
@@ -146,6 +178,26 @@ public class SessionPool {
         Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
+  public SessionPool(
+      List<String> nodeUrls,
+      String user,
+      String password,
+      int maxSize,
+      boolean enableCompression,
+      boolean enableCacheLeader) {
+    this(
+        nodeUrls,
+        user,
+        password,
+        maxSize,
+        Config.DEFAULT_FETCH_SIZE,
+        60_000,
+        enableCompression,
+        null,
+        enableCacheLeader,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+  }
+
   public SessionPool(
       String host, int port, String user, String password, int maxSize, ZoneId zoneId) {
     this(
@@ -162,6 +214,21 @@ public class SessionPool {
         Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
+  public SessionPool(
+      List<String> nodeUrls, String user, String password, int maxSize, ZoneId zoneId) {
+    this(
+        nodeUrls,
+        user,
+        password,
+        maxSize,
+        Config.DEFAULT_FETCH_SIZE,
+        60_000,
+        false,
+        zoneId,
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+  }
+
   @SuppressWarnings("squid:S107")
   public SessionPool(
       String host,
@@ -178,6 +245,32 @@ public class SessionPool {
     this.maxSize = maxSize;
     this.host = host;
     this.port = port;
+    this.nodeUrls = null;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+    this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
+    this.enableCompression = enableCompression;
+    this.zoneId = zoneId;
+    this.enableCacheLeader = enableCacheLeader;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
+  }
+
+  public SessionPool(
+      List<String> nodeUrls,
+      String user,
+      String password,
+      int maxSize,
+      int fetchSize,
+      long waitToGetSessionTimeoutInMs,
+      boolean enableCompression,
+      ZoneId zoneId,
+      boolean enableCacheLeader,
+      int connectionTimeoutInMs) {
+    this.maxSize = maxSize;
+    this.host = null;
+    this.port = -1;
+    this.nodeUrls = nodeUrls;
     this.user = user;
     this.password = password;
     this.fetchSize = fetchSize;
@@ -188,6 +281,35 @@ public class SessionPool {
     this.connectionTimeoutInMs = connectionTimeoutInMs;
   }
 
+  private Session constructNewSession() {
+    Session session;
+    if (nodeUrls == null) {
+      // Construct custom Session
+      session =
+          new Session.Builder()
+              .host(host)
+              .port(port)
+              .username(user)
+              .password(password)
+              .fetchSize(fetchSize)
+              .zoneId(zoneId)
+              .enableCacheLeader(enableCacheLeader)
+              .build();
+    } else {
+      // Construct redirect-able Session
+      session =
+          new Session.Builder()
+              .nodeUrls(nodeUrls)
+              .username(user)
+              .password(password)
+              .fetchSize(fetchSize)
+              .zoneId(zoneId)
+              .enableCacheLeader(enableCacheLeader)
+              .build();
+    }
+    return session;
+  }
+
   // if this method throws an exception, either the server is broken, or the ip/port/user/password
   // is incorrect.
   @SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive Complexity warning
@@ -254,9 +376,15 @@ public class SessionPool {
     if (shouldCreate) {
       // create a new one.
       if (logger.isDebugEnabled()) {
-        logger.debug("Create a new Session {}, {}, {}, {}", host, port, user, password);
+        if (nodeUrls == null) {
+          logger.debug("Create a new Session {}, {}, {}, {}", host, port, user, password);
+        } else {
+          logger.debug("Create a new redirect Session {}, {}, {}", nodeUrls, user, password);
+        }
       }
-      session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
+
+      session = constructNewSession();
+
       try {
         session.open(enableCompression, connectionTimeoutInMs);
         // avoid someone has called close() the session pool
@@ -352,7 +480,7 @@ public class SessionPool {
 
   @SuppressWarnings({"squid:S2446"})
   private void tryConstructNewSession() {
-    Session session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
+    Session session = constructNewSession();
     try {
       session.open(enableCompression, connectionTimeoutInMs);
       // avoid someone has called close() the session pool
@@ -2177,6 +2305,7 @@ public class SessionPool {
 
     private String host = Config.DEFAULT_HOST;
     private int port = Config.DEFAULT_PORT;
+    private List<String> nodeUrls = null;
     private int maxSize = Config.DEFAULT_SESSION_POOL_MAX_SIZE;
     private String user = Config.DEFAULT_USER;
     private String password = Config.DEFAULT_PASSWORD;
@@ -2197,6 +2326,11 @@ public class SessionPool {
       return this;
     }
 
+    public Builder nodeUrls(List<String> nodeUrls) {
+      this.nodeUrls = nodeUrls;
+      return this;
+    }
+
     public Builder maxSize(int maxSize) {
       this.maxSize = maxSize;
       return this;
@@ -2243,18 +2377,32 @@ public class SessionPool {
     }
 
     public SessionPool build() {
-      return new SessionPool(
-          host,
-          port,
-          user,
-          password,
-          maxSize,
-          fetchSize,
-          waitToGetSessionTimeoutInMs,
-          enableCompression,
-          zoneId,
-          enableCacheLeader,
-          connectionTimeoutInMs);
+      if (nodeUrls == null) {
+        return new SessionPool(
+            host,
+            port,
+            user,
+            password,
+            maxSize,
+            fetchSize,
+            waitToGetSessionTimeoutInMs,
+            enableCompression,
+            zoneId,
+            enableCacheLeader,
+            connectionTimeoutInMs);
+      } else {
+        return new SessionPool(
+            nodeUrls,
+            user,
+            password,
+            maxSize,
+            fetchSize,
+            waitToGetSessionTimeoutInMs,
+            enableCompression,
+            zoneId,
+            enableCacheLeader,
+            connectionTimeoutInMs);
+      }
     }
   }
 }