You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/06/26 03:51:00 UTC

[GitHub] [iotdb] neuyilan commented on a change in pull request #3434: [IOTDB-1399]Add a session interface to connect multiple nodes

neuyilan commented on a change in pull request #3434:
URL: https://github.com/apache/iotdb/pull/3434#discussion_r656718436



##########
File path: session/src/main/java/org/apache/iotdb/session/Session.java
##########
@@ -240,6 +241,70 @@ public Session(
     this.enableCacheLeader = enableCacheLeader;
   }
 
+  public Session(List<String> nodeUrls, String username, String password) {

Review comment:
       It's better using java doc to specify the URL format clearly

##########
File path: session/src/main/java/org/apache/iotdb/session/SessionConnection.java
##########
@@ -722,20 +748,28 @@ protected void testInsertTablets(TSInsertTabletsReq request)
   private boolean reconnect() {
     boolean flag = false;
     for (int i = 1; i <= Config.RETRY_NUM; i++) {
-      try {
-        if (transport != null) {
-          close();
-          init(endPoint);
-          flag = true;
-        }
-      } catch (Exception e) {
-        try {
-          Thread.sleep(Config.RETRY_INTERVAL_MS);
-        } catch (InterruptedException e1) {
-          logger.error("reconnect is interrupted.", e1);
-          Thread.currentThread().interrupt();
+      if (transport != null) {
+        transport.close();
+        int currHostIndex = endPointList.indexOf(session.defaultEndPoint);
+        for (int j = currHostIndex; j < endPointList.size(); j++) {

Review comment:
       If `currHostIndex` is the last index of `endPointList`, in your implementation, it can not try the others nodes again.

##########
File path: session/src/main/java/org/apache/iotdb/session/Session.java
##########
@@ -240,6 +241,70 @@ public Session(
     this.enableCacheLeader = enableCacheLeader;
   }
 
+  public Session(List<String> nodeUrls, String username, String password) {
+    this(
+        nodeUrls,
+        username,
+        password,
+        Config.DEFAULT_FETCH_SIZE,
+        null,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
+        Config.DEFAULT_MAX_FRAME_SIZE,
+        Config.DEFAULT_CACHE_LEADER_MODE);
+  }
+
+  public Session(
+      List<String> nodeUrls,
+      String username,
+      String password,
+      int fetchSize,
+      ZoneId zoneId,
+      int thriftDefaultBufferSize,
+      int thriftMaxFrameSize,
+      boolean enableCacheLeader) {
+    this.nodeUrls = nodeUrls;
+    this.username = username;
+    this.password = password;
+    this.fetchSize = fetchSize;
+    this.zoneId = zoneId;
+    this.thriftDefaultBufferSize = thriftDefaultBufferSize;
+    this.thriftMaxFrameSize = thriftMaxFrameSize;
+    this.enableCacheLeader = enableCacheLeader;
+  }
+
+  public synchronized void clusterOpen() throws IoTDBConnectionException {

Review comment:
       +1, I don't think it's necessary to change the default behavior of current users using session, in other words, I think we just need to add one interface like[1], then the user can just call the following codes[2] like before;
   [1] `public Session(List<String> nodeUrls, String username, String password) `
   [2] 
   
   ` 
   
       session= new Session(nodeUrls, "root", "root");
       session.open(false);
       session.setFetchSize(10000);
       try {
         session.setStorageGroup("root.sg1");
       } catch (StatementExecutionException e) {
         if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
           throw e;
         }
       }
   `

##########
File path: session/src/main/java/org/apache/iotdb/session/SessionUtils.java
##########
@@ -149,4 +158,32 @@ private static void getValueBufferOfDataType(
             String.format("Data type %s is not supported.", dataType));
     }
   }
+
+  public static List<EndPoint> parseSeedNodeUrls(List<String> nodeUrls) {
+    if (nodeUrls == null) {
+      return Collections.emptyList();
+    }
+    List<EndPoint> endPointsList = new ArrayList<>();
+    for (String nodeUrl : nodeUrls) {
+      EndPoint endPoint = parseNodeUrl(nodeUrl);
+      endPointsList.add(endPoint);
+    }
+    return endPointsList;
+  }
+
+  private static EndPoint parseNodeUrl(String nodeUrl) {
+    EndPoint endPoint = new EndPoint();
+    String[] split = nodeUrl.split(":");
+    if (split.length != 2) {
+      return null;
+    }
+    String ip = split[0];
+    try {
+      int rpcPort = Integer.parseInt(split[1]);
+      return endPoint.setIp(ip).setPort(rpcPort);
+    } catch (NumberFormatException e) {
+      logger.warn("parse url fail {}", nodeUrl);
+    }
+    return endPoint;
+  }

Review comment:
       If the user given one wrong URL format,  in the code implement, the `NumberFormatException` may occur, you will return one `empty` endPoint.
   In my opinion, if the seed URL format error, It's better rethrow the exception to the user. 

##########
File path: session/src/main/java/org/apache/iotdb/session/Session.java
##########
@@ -525,16 +584,12 @@ public SessionDataSet executeQueryStatement(String sql, long timeoutInMs)
   private SessionDataSet executeStatementMayRedirect(String sql, long timeoutInMs)
       throws StatementExecutionException, IoTDBConnectionException {
     try {
-      logger.info("{} execute sql {}", defaultSessionConnection.getEndPoint(), sql);
+      logger.info("{} execute sql {}", defaultEndPoint, sql);
       return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
     } catch (RedirectException e) {
       handleQueryRedirection(e.getEndPoint());
       if (enableQueryRedirection) {
-        logger.debug(
-            "{} redirect query {} to {}",
-            defaultSessionConnection.getEndPoint(),
-            sql,
-            e.getEndPoint());
+        logger.debug("{} redirect query {} to {}", defaultEndPoint, sql, e.getEndPoint());

Review comment:
       the same as above

##########
File path: session/src/main/java/org/apache/iotdb/session/Session.java
##########
@@ -525,16 +584,12 @@ public SessionDataSet executeQueryStatement(String sql, long timeoutInMs)
   private SessionDataSet executeStatementMayRedirect(String sql, long timeoutInMs)
       throws StatementExecutionException, IoTDBConnectionException {
     try {
-      logger.info("{} execute sql {}", defaultSessionConnection.getEndPoint(), sql);
+      logger.info("{} execute sql {}", defaultEndPoint, sql);

Review comment:
       `defaultSessionConnection.getEndPoint() `may not equal `defaultEndPoint` as long as we enable redirection

##########
File path: session/src/main/java/org/apache/iotdb/session/SessionUtils.java
##########
@@ -149,4 +158,32 @@ private static void getValueBufferOfDataType(
             String.format("Data type %s is not supported.", dataType));
     }
   }
+
+  public static List<EndPoint> parseSeedNodeUrls(List<String> nodeUrls) {
+    if (nodeUrls == null) {
+      return Collections.emptyList();
+    }
+    List<EndPoint> endPointsList = new ArrayList<>();
+    for (String nodeUrl : nodeUrls) {
+      EndPoint endPoint = parseNodeUrl(nodeUrl);
+      endPointsList.add(endPoint);

Review comment:
       `parseNodeUrl`  method may return `null`, we can not add one `null` endpoint to the `endPointsList`

##########
File path: session/src/main/java/org/apache/iotdb/session/SessionConnection.java
##########
@@ -145,6 +153,24 @@ private void init(EndPoint endPoint) throws IoTDBConnectionException {
     }
   }
 
+  private void initClusterConn() throws IoTDBConnectionException {
+    for (EndPoint endPoint : endPointList) {
+      if (endPoint == null) {
+        continue;
+      }

Review comment:
       Its better not allow `null` endPoint added in the `endPointList` in the previous `parseSeedNodeUrls` method. In other words, abnormal judgment should be made as soon as possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org