You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/26 12:13:54 UTC

[iotdb] branch SessionInfo created (now ec7bde68db)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch SessionInfo
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at ec7bde68db [To rel/0.13] [IOTDB-4767] Support fetching all connection info in session api

This branch includes the following new commits:

     new ec7bde68db [To rel/0.13] [IOTDB-4767] Support fetching all connection info in session api

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [To rel/0.13] [IOTDB-4767] Support fetching all connection info in session api

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch SessionInfo
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ec7bde68db08492e8ad611d08ff569149b387aee
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Oct 26 20:13:37 2022 +0800

    [To rel/0.13] [IOTDB-4767] Support fetching all connection info in session api
---
 .../iotdb/session/IoTDBConnectionInfoIT.java       | 93 ++++++++++++++++++++++
 .../apache/iotdb/session/pool/SessionPoolTest.java | 35 ++++++++
 .../iotdb/db/query/control/SessionManager.java     | 12 +++
 .../query/control/clientsession/ClientSession.java | 16 +++-
 .../control/clientsession/IClientSession.java      | 34 ++++++--
 .../control/clientsession/MqttClientSession.java   | 14 +++-
 .../db/service/thrift/impl/TSServiceImpl.java      |  6 ++
 .../java/org/apache/iotdb/session/Session.java     |  5 ++
 .../apache/iotdb/session/SessionConnection.java    | 17 ++++
 .../org/apache/iotdb/session/pool/SessionPool.java | 21 +++++
 thrift/src/main/thrift/rpc.thrift                  | 18 +++++
 11 files changed, 262 insertions(+), 9 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/session/IoTDBConnectionInfoIT.java b/integration/src/test/java/org/apache/iotdb/session/IoTDBConnectionInfoIT.java
new file mode 100644
index 0000000000..d316eed909
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/session/IoTDBConnectionInfoIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.ZoneId;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class IoTDBConnectionInfoIT {
+
+  @Before
+  public void setUp() {
+    System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testGetBackupConfiguration() throws IoTDBConnectionException {
+    Session session1 = new Session("127.0.0.1", 6667, "root", "root", ZoneId.of("+05:00"));
+    Session session2 = new Session("127.0.0.1", 6667, "root", "root", ZoneId.of("+05:00"));
+    try {
+      session1.open();
+      session2.open();
+      TSConnectionInfoResp resp1 = session1.fetchAllConnections();
+      TSConnectionInfoResp resp2 = session2.fetchAllConnections();
+      assertEquals(2, resp1.connectionInfoList.size());
+      assertEquals("root", resp1.connectionInfoList.get(0).userName);
+      assertEquals("root", resp1.connectionInfoList.get(1).userName);
+      assertEquals(TSConnectionType.THRIFT_BASED, resp1.connectionInfoList.get(0).type);
+      assertEquals(TSConnectionType.THRIFT_BASED, resp1.connectionInfoList.get(1).type);
+      assertTrue(resp1.connectionInfoList.get(0).connectionId.startsWith("127.0.0.1"));
+      assertTrue(resp1.connectionInfoList.get(1).connectionId.startsWith("127.0.0.1"));
+
+      assertEquals(2, resp2.connectionInfoList.size());
+      assertEquals("root", resp2.connectionInfoList.get(0).userName);
+      assertEquals("root", resp2.connectionInfoList.get(1).userName);
+      assertEquals(TSConnectionType.THRIFT_BASED, resp2.connectionInfoList.get(0).type);
+      assertEquals(TSConnectionType.THRIFT_BASED, resp2.connectionInfoList.get(1).type);
+      assertTrue(resp2.connectionInfoList.get(0).connectionId.startsWith("127.0.0.1"));
+      assertTrue(resp2.connectionInfoList.get(1).connectionId.startsWith("127.0.0.1"));
+
+      assertEquals(
+          resp1.connectionInfoList.get(0).connectionId,
+          resp2.connectionInfoList.get(0).connectionId);
+      assertEquals(
+          resp1.connectionInfoList.get(1).connectionId,
+          resp2.connectionInfoList.get(1).connectionId);
+      assertEquals(
+          resp1.connectionInfoList.get(0).logInTime, resp2.connectionInfoList.get(0).logInTime);
+      assertEquals(
+          resp1.connectionInfoList.get(1).logInTime, resp2.connectionInfoList.get(1).logInTime);
+
+    } catch (Exception e) {
+      fail();
+    } finally {
+      session1.close();
+      session2.close();
+    }
+  }
+}
diff --git a/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index ff51933e4b..22f5bb23b0 100644
--- a/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
 import org.apache.iotdb.session.Config;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -479,4 +481,37 @@ public class SessionPoolTest {
     assertEquals(3, pool.getConnectionTimeoutInMs());
     assertEquals(ZoneOffset.UTC, pool.getZoneId());
   }
+
+  @Test
+  public void testFetchConnections() {
+    SessionPool pool =
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            60000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+
+    try {
+
+      TSConnectionInfoResp resp = pool.fetchAllConnections();
+
+      assertEquals(1, resp.connectionInfoList.size());
+      assertEquals("root", resp.connectionInfoList.get(0).userName);
+      assertEquals(TSConnectionType.THRIFT_BASED, resp.connectionInfoList.get(0).type);
+      assertTrue(resp.connectionInfoList.get(0).connectionId.startsWith("127.0.0.1"));
+
+    } catch (IoTDBConnectionException e) {
+      logger.error("testFetchConnections failed", e);
+      fail(e.getMessage());
+    } finally {
+      pool.close();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 025c8bc423..294be39662 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -23,12 +23,15 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.query.dataset.UDTFDataSet;
 import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
+import java.util.Comparator;
 import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
@@ -143,6 +146,7 @@ public class SessionManager implements SessionManagerMBean {
     session.setZoneId(ZoneId.of(zoneId));
     session.setClientVersion(clientVersion);
     session.setLogin(true);
+    session.setLogInTime(System.currentTimeMillis());
   }
 
   /**
@@ -264,6 +268,14 @@ public class SessionManager implements SessionManagerMBean {
     return this.sessions.keySet().stream().map(x -> x.toString()).collect(Collectors.toSet());
   }
 
+  public TSConnectionInfoResp getAllConnectionInfo() {
+    return new TSConnectionInfoResp(
+        sessions.keySet().stream()
+            .map(IClientSession::convertToTSConnectionInfo)
+            .sorted(Comparator.comparingLong(TSConnectionInfo::getLogInTime))
+            .collect(Collectors.toList()));
+  }
+
   private static class SessionManagerHelper {
 
     private static final SessionManager INSTANCE = new SessionManager();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
index c2a56a8a13..0622715787 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.query.control.clientsession;
 
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
 import java.io.IOException;
 import java.net.Socket;
 import java.util.Set;
@@ -26,10 +28,10 @@ import java.util.concurrent.CopyOnWriteArraySet;
 /** Client Session is the only identity for a connection. */
 public class ClientSession extends IClientSession {
 
-  Socket clientSocket;
+  private final Socket clientSocket;
 
   // TODO why we use copyOnWriteArraySet instead of HashSet??
-  Set<Long> statements = new CopyOnWriteArraySet();
+  private final Set<Long> statements = new CopyOnWriteArraySet<>();
 
   public ClientSession(Socket clientSocket) {
     this.clientSocket = clientSocket;
@@ -45,6 +47,16 @@ public class ClientSession extends IClientSession {
     return clientSocket.getPort();
   }
 
+  @Override
+  TSConnectionType getConnectionType() {
+    return TSConnectionType.THRIFT_BASED;
+  }
+
+  @Override
+  String getConnectionId() {
+    return String.format("%s:%s", getClientAddress(), getClientPort());
+  }
+
   @Override
   public Set<Long> getStatementIds() {
     return statements;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
index b531db45a8..e55e4a4781 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.db.query.control.clientsession;
 
 import org.apache.iotdb.db.conf.IoTDBConstant.ClientVersion;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
 
 import java.time.ZoneId;
 import java.util.Set;
@@ -27,23 +29,30 @@ import java.util.TimeZone;
 public abstract class IClientSession {
 
   /** id is just used for keep compatible with v0.13 */
-  @Deprecated long id;
+  @Deprecated private long id;
 
-  ClientVersion clientVersion;
+  private ClientVersion clientVersion;
 
-  ZoneId zoneId;
+  private ZoneId zoneId;
 
   // TODO: why some Statement Plans use timeZone while others use ZoneId?
-  TimeZone timeZone;
+  private TimeZone timeZone;
 
-  String username;
+  private String username;
 
-  boolean login = false;
+  private boolean login = false;
+
+  private long logInTime;
 
   abstract String getClientAddress();
 
   abstract int getClientPort();
 
+  abstract TSConnectionType getConnectionType();
+
+  /** ip:port for thrift-based service and client id for mqtt-based service */
+  abstract String getConnectionId();
+
   public void setClientVersion(ClientVersion clientVersion) {
     this.clientVersion = clientVersion;
   }
@@ -86,6 +95,14 @@ public abstract class IClientSession {
     this.login = login;
   }
 
+  public void setLogInTime(long logInTime) {
+    this.logInTime = logInTime;
+  }
+
+  public long getLogInTime() {
+    return logInTime;
+  }
+
   @Deprecated
   public long getId() {
     return id;
@@ -101,6 +118,11 @@ public abstract class IClientSession {
         "%d-%s:%s:%d", getId(), getUsername(), getClientAddress(), getClientPort());
   }
 
+  public TSConnectionInfo convertToTSConnectionInfo() {
+    return new TSConnectionInfo(
+        getUsername(), getLogInTime(), getConnectionId(), getConnectionType());
+  }
+
   /**
    * statementIds that this client opens.<br>
    * For JDBC clients, each Statement instance has a statement id.<br>
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
index 89acd74df7..a792c6f00f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
@@ -18,12 +18,14 @@
  */
 package org.apache.iotdb.db.query.control.clientsession;
 
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
 import java.util.Collections;
 import java.util.Set;
 
 public class MqttClientSession extends IClientSession {
 
-  String clientID;
+  private final String clientID;
 
   public MqttClientSession(String clientID) {
     this.clientID = clientID;
@@ -43,6 +45,16 @@ public class MqttClientSession extends IClientSession {
     return 0;
   }
 
+  @Override
+  TSConnectionType getConnectionType() {
+    return TSConnectionType.MQTT_BASED;
+  }
+
+  @Override
+  String getConnectionId() {
+    return clientID;
+  }
+
   public String toString() {
     return String.format("%d-%s", getId(), getClientID());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index aebb170c86..eefc352bc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -91,6 +91,7 @@ import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
 import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -2209,6 +2210,11 @@ public class TSServiceImpl implements TSIService.Iface {
     return syncConf;
   }
 
+  @Override
+  public TSConnectionInfoResp fetchAllConnectionsInfo() {
+    return SESSION_MANAGER.getAllConnectionInfo();
+  }
+
   protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
     try {
       if (isEnableOperationSync) {
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index ba7e1fce6b..964eef770e 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -3110,6 +3111,10 @@ public class Session {
     return defaultSessionConnection.getBackupConfiguration();
   }
 
+  public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
+    return defaultSessionConnection.fetchAllConnections();
+  }
+
   public static class Builder {
     private String host = Config.DEFAULT_HOST;
     private int rpcPort = Config.DEFAULT_PORT;
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index f2316f3651..1f0c6b6919 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -1103,6 +1104,22 @@ public class SessionConnection {
     return MSG_RECONNECTION_FAIL.concat(urls.toString());
   }
 
+  public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
+    try {
+      return client.fetchAllConnectionsInfo();
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          return client.fetchAllConnectionsInfo();
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(logForReconnectionFailure());
+      }
+    }
+  }
+
   @Override
   public String toString() {
     return "SessionConnection{" + " endPoint=" + endPoint + "}";
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 031ce64d6b..711f7f66df 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.session.pool;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
 import org.apache.iotdb.session.Config;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.SessionDataSet;
@@ -2405,6 +2406,26 @@ public class SessionPool {
     return connectionTimeoutInMs;
   }
 
+  public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
+
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        TSConnectionInfoResp resp = session.fetchAllConnections();
+        putBack(session);
+        return resp;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("fetchAllConnections failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (Throwable t) {
+        putBack(session);
+        throw t;
+      }
+    }
+    return null;
+  }
+
   public static class Builder {
 
     private String host = Config.DEFAULT_HOST;
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 43d79d23b4..830bede11b 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -439,6 +439,22 @@ struct TSBackupConfigurationResp {
   4: optional i32 secondaryPort
 }
 
+enum TSConnectionType {
+  THRIFT_BASED
+  MQTT_BASED
+}
+
+struct TSConnectionInfo {
+  1: required string userName
+  2: required i64 logInTime
+  3: required string connectionId // ip:port for thrift-based service and clientId for mqtt-based service
+  4: required TSConnectionType type
+}
+
+struct TSConnectionInfoResp {
+  1: required list<TSConnectionInfo> connectionInfoList
+}
+
 service TSIService {
   TSOpenSessionResp openSession(1:TSOpenSessionReq req);
 
@@ -539,4 +555,6 @@ service TSIService {
   TSStatus executeOperationSync(1:TSOperationSyncWriteReq req);
 
   TSBackupConfigurationResp getBackupConfiguration();
+
+  TSConnectionInfoResp fetchAllConnectionsInfo();
 }
\ No newline at end of file