You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/27 05:00:13 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13] [IOTDB-4767] Support fetching all connection info in Session & SessionPool (#7742)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 42bda48467 [To rel/0.13] [IOTDB-4767] Support fetching all connection info in Session & SessionPool (#7742)
42bda48467 is described below
commit 42bda48467e1aff9dda9982c632219ba4e5a4565
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Thu Oct 27 13:00:07 2022 +0800
[To rel/0.13] [IOTDB-4767] Support fetching all connection info in Session & SessionPool (#7742)
---
.../iotdb/session/IoTDBConnectionInfoIT.java | 93 ++++++++++++++++++++++
.../apache/iotdb/session/pool/SessionPoolTest.java | 35 ++++++++
.../iotdb/db/query/control/SessionManager.java | 12 +++
.../query/control/clientsession/ClientSession.java | 16 +++-
.../control/clientsession/IClientSession.java | 34 ++++++--
.../control/clientsession/MqttClientSession.java | 14 +++-
.../db/service/thrift/impl/TSServiceImpl.java | 6 ++
.../java/org/apache/iotdb/session/Session.java | 5 ++
.../apache/iotdb/session/SessionConnection.java | 17 ++++
.../org/apache/iotdb/session/pool/SessionPool.java | 21 +++++
thrift/rpc-changelist.md | 4 +
thrift/src/main/thrift/rpc.thrift | 18 +++++
12 files changed, 266 insertions(+), 9 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/session/IoTDBConnectionInfoIT.java b/integration/src/test/java/org/apache/iotdb/session/IoTDBConnectionInfoIT.java
new file mode 100644
index 0000000000..d316eed909
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/session/IoTDBConnectionInfoIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.ZoneId;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class IoTDBConnectionInfoIT {
+
+ @Before
+ public void setUp() {
+ System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testGetBackupConfiguration() throws IoTDBConnectionException {
+ Session session1 = new Session("127.0.0.1", 6667, "root", "root", ZoneId.of("+05:00"));
+ Session session2 = new Session("127.0.0.1", 6667, "root", "root", ZoneId.of("+05:00"));
+ try {
+ session1.open();
+ session2.open();
+ TSConnectionInfoResp resp1 = session1.fetchAllConnections();
+ TSConnectionInfoResp resp2 = session2.fetchAllConnections();
+ assertEquals(2, resp1.connectionInfoList.size());
+ assertEquals("root", resp1.connectionInfoList.get(0).userName);
+ assertEquals("root", resp1.connectionInfoList.get(1).userName);
+ assertEquals(TSConnectionType.THRIFT_BASED, resp1.connectionInfoList.get(0).type);
+ assertEquals(TSConnectionType.THRIFT_BASED, resp1.connectionInfoList.get(1).type);
+ assertTrue(resp1.connectionInfoList.get(0).connectionId.startsWith("127.0.0.1"));
+ assertTrue(resp1.connectionInfoList.get(1).connectionId.startsWith("127.0.0.1"));
+
+ assertEquals(2, resp2.connectionInfoList.size());
+ assertEquals("root", resp2.connectionInfoList.get(0).userName);
+ assertEquals("root", resp2.connectionInfoList.get(1).userName);
+ assertEquals(TSConnectionType.THRIFT_BASED, resp2.connectionInfoList.get(0).type);
+ assertEquals(TSConnectionType.THRIFT_BASED, resp2.connectionInfoList.get(1).type);
+ assertTrue(resp2.connectionInfoList.get(0).connectionId.startsWith("127.0.0.1"));
+ assertTrue(resp2.connectionInfoList.get(1).connectionId.startsWith("127.0.0.1"));
+
+ assertEquals(
+ resp1.connectionInfoList.get(0).connectionId,
+ resp2.connectionInfoList.get(0).connectionId);
+ assertEquals(
+ resp1.connectionInfoList.get(1).connectionId,
+ resp2.connectionInfoList.get(1).connectionId);
+ assertEquals(
+ resp1.connectionInfoList.get(0).logInTime, resp2.connectionInfoList.get(0).logInTime);
+ assertEquals(
+ resp1.connectionInfoList.get(1).logInTime, resp2.connectionInfoList.get(1).logInTime);
+
+ } catch (Exception e) {
+ fail();
+ } finally {
+ session1.close();
+ session2.close();
+ }
+ }
+}
diff --git a/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index ff51933e4b..22f5bb23b0 100644
--- a/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
import org.apache.iotdb.session.Config;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -479,4 +481,37 @@ public class SessionPoolTest {
assertEquals(3, pool.getConnectionTimeoutInMs());
assertEquals(ZoneOffset.UTC, pool.getZoneId());
}
+
+ @Test
+ public void testFetchConnections() {
+ SessionPool pool =
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 3,
+ 1,
+ 60000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+
+ try {
+
+ TSConnectionInfoResp resp = pool.fetchAllConnections();
+
+ assertEquals(1, resp.connectionInfoList.size());
+ assertEquals("root", resp.connectionInfoList.get(0).userName);
+ assertEquals(TSConnectionType.THRIFT_BASED, resp.connectionInfoList.get(0).type);
+ assertTrue(resp.connectionInfoList.get(0).connectionId.startsWith("127.0.0.1"));
+
+ } catch (IoTDBConnectionException e) {
+ logger.error("testFetchConnections failed", e);
+ fail(e.getMessage());
+ } finally {
+ pool.close();
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 025c8bc423..294be39662 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -23,12 +23,15 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.control.clientsession.IClientSession;
import org.apache.iotdb.db.query.dataset.UDTFDataSet;
import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
+import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
@@ -143,6 +146,7 @@ public class SessionManager implements SessionManagerMBean {
session.setZoneId(ZoneId.of(zoneId));
session.setClientVersion(clientVersion);
session.setLogin(true);
+ session.setLogInTime(System.currentTimeMillis());
}
/**
@@ -264,6 +268,14 @@ public class SessionManager implements SessionManagerMBean {
return this.sessions.keySet().stream().map(x -> x.toString()).collect(Collectors.toSet());
}
+ public TSConnectionInfoResp getAllConnectionInfo() {
+ return new TSConnectionInfoResp(
+ sessions.keySet().stream()
+ .map(IClientSession::convertToTSConnectionInfo)
+ .sorted(Comparator.comparingLong(TSConnectionInfo::getLogInTime))
+ .collect(Collectors.toList()));
+ }
+
private static class SessionManagerHelper {
private static final SessionManager INSTANCE = new SessionManager();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
index c2a56a8a13..0622715787 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.query.control.clientsession;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
import java.io.IOException;
import java.net.Socket;
import java.util.Set;
@@ -26,10 +28,10 @@ import java.util.concurrent.CopyOnWriteArraySet;
/** Client Session is the only identity for a connection. */
public class ClientSession extends IClientSession {
- Socket clientSocket;
+ private final Socket clientSocket;
// TODO why we use copyOnWriteArraySet instead of HashSet??
- Set<Long> statements = new CopyOnWriteArraySet();
+ private final Set<Long> statements = new CopyOnWriteArraySet<>();
public ClientSession(Socket clientSocket) {
this.clientSocket = clientSocket;
@@ -45,6 +47,16 @@ public class ClientSession extends IClientSession {
return clientSocket.getPort();
}
+ @Override
+ TSConnectionType getConnectionType() {
+ return TSConnectionType.THRIFT_BASED;
+ }
+
+ @Override
+ String getConnectionId() {
+ return String.format("%s:%s", getClientAddress(), getClientPort());
+ }
+
@Override
public Set<Long> getStatementIds() {
return statements;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
index b531db45a8..e55e4a4781 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.query.control.clientsession;
import org.apache.iotdb.db.conf.IoTDBConstant.ClientVersion;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
import java.time.ZoneId;
import java.util.Set;
@@ -27,23 +29,30 @@ import java.util.TimeZone;
public abstract class IClientSession {
/** id is just used for keep compatible with v0.13 */
- @Deprecated long id;
+ @Deprecated private long id;
- ClientVersion clientVersion;
+ private ClientVersion clientVersion;
- ZoneId zoneId;
+ private ZoneId zoneId;
// TODO: why some Statement Plans use timeZone while others use ZoneId?
- TimeZone timeZone;
+ private TimeZone timeZone;
- String username;
+ private String username;
- boolean login = false;
+ private boolean login = false;
+
+ private long logInTime;
abstract String getClientAddress();
abstract int getClientPort();
+ abstract TSConnectionType getConnectionType();
+
+ /** ip:port for thrift-based service and client id for mqtt-based service */
+ abstract String getConnectionId();
+
public void setClientVersion(ClientVersion clientVersion) {
this.clientVersion = clientVersion;
}
@@ -86,6 +95,14 @@ public abstract class IClientSession {
this.login = login;
}
+ public void setLogInTime(long logInTime) {
+ this.logInTime = logInTime;
+ }
+
+ public long getLogInTime() {
+ return logInTime;
+ }
+
@Deprecated
public long getId() {
return id;
@@ -101,6 +118,11 @@ public abstract class IClientSession {
"%d-%s:%s:%d", getId(), getUsername(), getClientAddress(), getClientPort());
}
+ public TSConnectionInfo convertToTSConnectionInfo() {
+ return new TSConnectionInfo(
+ getUsername(), getLogInTime(), getConnectionId(), getConnectionType());
+ }
+
/**
* statementIds that this client opens.<br>
* For JDBC clients, each Statement instance has a statement id.<br>
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
index 89acd74df7..a792c6f00f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java
@@ -18,12 +18,14 @@
*/
package org.apache.iotdb.db.query.control.clientsession;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
+
import java.util.Collections;
import java.util.Set;
public class MqttClientSession extends IClientSession {
- String clientID;
+ private final String clientID;
public MqttClientSession(String clientID) {
this.clientID = clientID;
@@ -43,6 +45,16 @@ public class MqttClientSession extends IClientSession {
return 0;
}
+ @Override
+ TSConnectionType getConnectionType() {
+ return TSConnectionType.MQTT_BASED;
+ }
+
+ @Override
+ String getConnectionId() {
+ return clientID;
+ }
+
public String toString() {
return String.format("%d-%s", getId(), getClientID());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index aebb170c86..eefc352bc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -91,6 +91,7 @@ import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -2209,6 +2210,11 @@ public class TSServiceImpl implements TSIService.Iface {
return syncConf;
}
+ @Override
+ public TSConnectionInfoResp fetchAllConnectionsInfo() {
+ return SESSION_MANAGER.getAllConnectionInfo();
+ }
+
protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
try {
if (isEnableOperationSync) {
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index ba7e1fce6b..964eef770e 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -3110,6 +3111,10 @@ public class Session {
return defaultSessionConnection.getBackupConfiguration();
}
+ public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
+ return defaultSessionConnection.fetchAllConnections();
+ }
+
public static class Builder {
private String host = Config.DEFAULT_HOST;
private int rpcPort = Config.DEFAULT_PORT;
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index f2316f3651..1f0c6b6919 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
@@ -1103,6 +1104,22 @@ public class SessionConnection {
return MSG_RECONNECTION_FAIL.concat(urls.toString());
}
+ public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
+ try {
+ return client.fetchAllConnectionsInfo();
+ } catch (TException e) {
+ if (reconnect()) {
+ try {
+ return client.fetchAllConnectionsInfo();
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ }
+ }
+
@Override
public String toString() {
return "SessionConnection{" + " endPoint=" + endPoint + "}";
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 031ce64d6b..711f7f66df 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.session.pool;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
import org.apache.iotdb.session.Config;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
@@ -2405,6 +2406,26 @@ public class SessionPool {
return connectionTimeoutInMs;
}
+ public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException {
+
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ TSConnectionInfoResp resp = session.fetchAllConnections();
+ putBack(session);
+ return resp;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ logger.warn("fetchAllConnections failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (Throwable t) {
+ putBack(session);
+ throw t;
+ }
+ }
+ return null;
+ }
+
public static class Builder {
private String host = Config.DEFAULT_HOST;
diff --git a/thrift/rpc-changelist.md b/thrift/rpc-changelist.md
index f3095478f1..8cd9dd09ed 100644
--- a/thrift/rpc-changelist.md
+++ b/thrift/rpc-changelist.md
@@ -39,6 +39,10 @@ Last Updated on 2022.1.17 by Xin Zhao.
| Add TSDropSchemaTemplateReq, TSStatus dropSchemaTemplate | Xin Zhao |
| Add TSCreateAlignedTimeseriesReq | Haonan Hou |
| Add TSOperationSyncWriteReq | Rongzhao Chen |
+| Add enum TSConnectionType | Yuan Tian |
+| Add struct TSConnectionInfo | Yuan Tian |
+| Add struct TSConnectionInfoResp | Yuan Tian |
+| Add method TSConnectionInfoResp fetchAllConnectionsInfo() | Yuan Tian |
## 3. Update
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 43d79d23b4..830bede11b 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -439,6 +439,22 @@ struct TSBackupConfigurationResp {
4: optional i32 secondaryPort
}
+enum TSConnectionType {
+ THRIFT_BASED
+ MQTT_BASED
+}
+
+struct TSConnectionInfo {
+ 1: required string userName
+ 2: required i64 logInTime
+ 3: required string connectionId // ip:port for thrift-based service and clientId for mqtt-based service
+ 4: required TSConnectionType type
+}
+
+struct TSConnectionInfoResp {
+ 1: required list<TSConnectionInfo> connectionInfoList
+}
+
service TSIService {
TSOpenSessionResp openSession(1:TSOpenSessionReq req);
@@ -539,4 +555,6 @@ service TSIService {
TSStatus executeOperationSync(1:TSOperationSyncWriteReq req);
TSBackupConfigurationResp getBackupConfiguration();
+
+ TSConnectionInfoResp fetchAllConnectionsInfo();
}
\ No newline at end of file