You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/09/17 03:49:23 UTC

[incubator-iotdb] branch fix_session_tsocket created (now 6b22dfa)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch fix_session_tsocket
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 6b22dfa  add reconnect and log in session

This branch includes the following new commits:

     new 6b22dfa  add reconnect and log in session

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: add reconnect and log in session

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_session_tsocket
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 6b22dfa4d483d2b51fc3655204ad1a50ec15bae8
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Sep 17 11:49:01 2019 +0800

    add reconnect and log in session
---
 .../main/java/org/apache/iotdb/session/Config.java |  3 +
 .../java/org/apache/iotdb/session/Session.java     | 96 +++++++++++++++++-----
 2 files changed, 80 insertions(+), 19 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java
index 3bc2344..91564cd 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -23,4 +23,7 @@ public class Config {
   public static final String DEFAULT_USER = "user";
   public static final String DEFAULT_PASSWORD = "password";
 
+  public static final int RETRY_NUM = 3;
+  public static final long RETRY_INTERVAL = 1000;
+  public static int connectionTimeoutInMs = 0;
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 50e00fa..6417aa5 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -18,9 +18,11 @@
  */
 package org.apache.iotdb.session;
 
+import java.sql.SQLException;
 import java.util.List;
 import org.apache.iotdb.rpc.IoTDBRPCException;
 import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusType;
 import org.apache.iotdb.service.rpc.thrift.*;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -34,9 +36,12 @@ import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
 import java.time.ZoneId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Session {
 
+  private static final Logger logger = LoggerFactory.getLogger(Session.class);
   private String host;
   private int port;
   private String username;
@@ -47,6 +52,7 @@ public class Session {
   private TSocket transport;
   private boolean isClosed = true;
   private ZoneId zoneId;
+  private boolean enableRPCCompression;
 
   public Session(String host, int port) {
     this(host, port, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD);
@@ -64,23 +70,62 @@ public class Session {
   }
 
   public synchronized void open() throws IoTDBSessionException {
-    open(false, 0);
+    open(false);
   }
 
-  public synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
+  public synchronized void open(boolean enableRPCCompression)
       throws IoTDBSessionException {
     if (!isClosed) {
       return;
     }
-    transport = new TSocket(host, port, connectionTimeoutInMs);
-    if (!transport.isOpen()) {
+
+    this.enableRPCCompression = enableRPCCompression;
+
+    try {
+      openTransport();
+    } catch (TTransportException e) {
+      throw new IoTDBSessionException(e);
+    }
+    openSession();
+    isClosed = false;
+    client = RpcUtils.newSynchronizedClient(client);
+
+  }
+
+  public boolean reconnect() throws IoTDBSessionException {
+    boolean flag = false;
+    for (int i = 1; i <= Config.RETRY_NUM; i++) {
       try {
-        transport.open();
-      } catch (TTransportException e) {
-        throw new IoTDBSessionException(e);
+        if (transport != null) {
+          transport.close();
+          openTransport();
+          openSession();
+          client = RpcUtils.newSynchronizedClient(client);
+          flag = true;
+          break;
+        }
+      } catch (Exception e) {
+        try {
+          Thread.sleep(Config.RETRY_INTERVAL);
+        } catch (InterruptedException e1) {
+          logger.error("reconnect is interrupted.", e1);
+        }
       }
     }
+    if (!flag) {
+      throw new IoTDBSessionException("Fail to reconnect to server. please check server status");
+    }
+    return flag;
+  }
 
+  private void openTransport() throws TTransportException {
+    transport = new TSocket(host, port, Config.connectionTimeoutInMs);
+    if (!transport.isOpen()) {
+      transport.open();
+    }
+  }
+
+  private void openSession() throws IoTDBSessionException {
     if(enableRPCCompression) {
       client = new TSIService.Client(new TCompactProtocol(transport));
     }
@@ -89,20 +134,26 @@ public class Session {
     }
 
     TSOpenSessionReq openReq = new TSOpenSessionReq(TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V1);
+
     openReq.setUsername(username);
     openReq.setPassword(password);
 
     try {
       TSOpenSessionResp openResp = client.openSession(openReq);
 
-      RpcUtils.verifySuccess(openResp.getStatus());
-
+      // validate connection
+      try {
+        RpcUtils.verifySuccess(openResp.getStatus());
+      } catch (IoTDBRPCException e) {
+        // failed to connect, disconnect from the server
+        transport.close();
+        throw new IoTDBSessionException(e.getMessage());
+      }
       if (protocolVersion.getValue() != openResp.getServerProtocolVersion().getValue()) {
         throw new TException(String
             .format("Protocol not supported, Client version is {}, but Server version is {}",
                 protocolVersion.getValue(), openResp.getServerProtocolVersion().getValue()));
       }
-
       sessionHandle = openResp.getSessionHandle();
 
       if (zoneId != null) {
@@ -112,14 +163,10 @@ public class Session {
       }
 
     } catch (TException | IoTDBRPCException e) {
-      transport.close();
-      throw new IoTDBSessionException(String.format("Can not open session to %s:%s with user: %s.",
-          host, port, username), e);
+      throw new IoTDBSessionException(String.format("Can not establish connection to %s.",
+          host), e);
     }
     isClosed = false;
-
-    client = RpcUtils.newSynchronizedClient(client);
-
   }
 
   public synchronized void close() throws IoTDBSessionException {
@@ -153,6 +200,7 @@ public class Session {
     try {
       return client.insertBatch(request);
     } catch (TException e) {
+      reconnect();
       throw new IoTDBSessionException(e);
     }
   }
@@ -166,8 +214,9 @@ public class Session {
     request.setValues(values);
 
     try {
-      return client.insertRow(request);
+      return checkAndReturn(client.insertRow(request));
     } catch (TException e) {
+      reconnect();
       throw new IoTDBSessionException(e);
     }
   }
@@ -177,8 +226,9 @@ public class Session {
     request.setStorageGroupId(storageGroupId);
 
     try {
-      return client.setStorageGroup(request);
+      return checkAndReturn(client.setStorageGroup(request));
     } catch (TException e) {
+      reconnect();
       throw new IoTDBSessionException(e);
     }
   }
@@ -191,12 +241,20 @@ public class Session {
     request.setCompressor(compressor.ordinal());
 
     try {
-      return client.createTimeseries(request);
+      return checkAndReturn(client.createTimeseries(request));
     } catch (TException e) {
+      reconnect();
       throw new IoTDBSessionException(e);
     }
   }
 
+  private TSRPCResp checkAndReturn(TSRPCResp resp) {
+    if (resp.status.statusType.getCode() != TSStatusType.SUCCESS_STATUS.getStatusCode()) {
+      logger.error(resp.status.statusType.getMessage());
+    }
+    return resp;
+  }
+
   public synchronized String getTimeZone() throws TException, IoTDBRPCException {
     if (zoneId != null) {
       return zoneId.toString();