You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/02/07 03:42:39 UTC
[iotdb] 01/01: Fix client oom v1.0.0
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch fix_client_oom_v1.0.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 35bf5d6d11502c988cfaeacb6a4d0c9a5b650eaa
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Feb 7 11:42:22 2023 +0800
Fix client oom v1.0.0
---
integration-test/pom.xml | 210 ++++++++++-----------
pom.xml | 2 +-
.../java/org/apache/iotdb/session/ISession.java | 6 +
.../java/org/apache/iotdb/session/Session.java | 29 ++-
.../org/apache/iotdb/session/pool/SessionPool.java | 16 +-
5 files changed, 154 insertions(+), 109 deletions(-)
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 1b5addb22a..bae141d437 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -95,112 +95,112 @@
<build>
<plugins>
<!-- skip default-test -->
-<!-- <plugin>-->
-<!-- <groupId>org.apache.maven.plugins</groupId>-->
-<!-- <artifactId>maven-surefire-plugin</artifactId>-->
-<!-- <executions>-->
-<!-- <execution>-->
-<!-- <id>default-test</id>-->
-<!-- <configuration>-->
-<!-- <skip>true</skip>-->
-<!-- </configuration>-->
-<!-- </execution>-->
-<!-- </executions>-->
-<!-- </plugin>-->
-<!-- <!– If the test starts separate processes, we should package first –>-->
-<!-- <plugin>-->
-<!-- <groupId>org.apache.maven.plugins</groupId>-->
-<!-- <artifactId>maven-assembly-plugin</artifactId>-->
-<!-- <version>${maven.assembly.version}</version>-->
-<!-- <configuration>-->
-<!-- <skipAssembly>${integrationTest.launchNodeInSameJVM}</skipAssembly>-->
-<!-- </configuration>-->
-<!-- <executions>-->
-<!-- <!– Package binaries–>-->
-<!-- <execution>-->
-<!-- <id>cluster-test-assembly</id>-->
-<!-- <phase>package</phase>-->
-<!-- <goals>-->
-<!-- <goal>single</goal>-->
-<!-- </goals>-->
-<!-- <configuration>-->
-<!-- <descriptors>-->
-<!-- <descriptor>src/assembly/mpp-test.xml</descriptor>-->
-<!-- </descriptors>-->
-<!-- <finalName>template-node</finalName>-->
-<!-- <appendAssemblyId>false</appendAssemblyId>-->
-<!-- </configuration>-->
-<!-- </execution>-->
-<!-- <execution>-->
-<!-- <id>cluster-test-assembly-share</id>-->
-<!-- <phase>package</phase>-->
-<!-- <goals>-->
-<!-- <goal>single</goal>-->
-<!-- </goals>-->
-<!-- <configuration>-->
-<!-- <descriptors>-->
-<!-- <descriptor>src/assembly/mpp-share.xml</descriptor>-->
-<!-- </descriptors>-->
-<!-- <finalName>template-node-share</finalName>-->
-<!-- <appendAssemblyId>false</appendAssemblyId>-->
-<!-- </configuration>-->
-<!-- </execution>-->
-<!-- </executions>-->
-<!-- </plugin>-->
+ <!-- <plugin>-->
+ <!-- <groupId>org.apache.maven.plugins</groupId>-->
+ <!-- <artifactId>maven-surefire-plugin</artifactId>-->
+ <!-- <executions>-->
+ <!-- <execution>-->
+ <!-- <id>default-test</id>-->
+ <!-- <configuration>-->
+ <!-- <skip>true</skip>-->
+ <!-- </configuration>-->
+ <!-- </execution>-->
+ <!-- </executions>-->
+ <!-- </plugin>-->
+ <!-- <!– If the test starts separate processes, we should package first –>-->
+ <!-- <plugin>-->
+ <!-- <groupId>org.apache.maven.plugins</groupId>-->
+ <!-- <artifactId>maven-assembly-plugin</artifactId>-->
+ <!-- <version>${maven.assembly.version}</version>-->
+ <!-- <configuration>-->
+ <!-- <skipAssembly>${integrationTest.launchNodeInSameJVM}</skipAssembly>-->
+ <!-- </configuration>-->
+ <!-- <executions>-->
+ <!-- <!– Package binaries–>-->
+ <!-- <execution>-->
+ <!-- <id>cluster-test-assembly</id>-->
+ <!-- <phase>package</phase>-->
+ <!-- <goals>-->
+ <!-- <goal>single</goal>-->
+ <!-- </goals>-->
+ <!-- <configuration>-->
+ <!-- <descriptors>-->
+ <!-- <descriptor>src/assembly/mpp-test.xml</descriptor>-->
+ <!-- </descriptors>-->
+ <!-- <finalName>template-node</finalName>-->
+ <!-- <appendAssemblyId>false</appendAssemblyId>-->
+ <!-- </configuration>-->
+ <!-- </execution>-->
+ <!-- <execution>-->
+ <!-- <id>cluster-test-assembly-share</id>-->
+ <!-- <phase>package</phase>-->
+ <!-- <goals>-->
+ <!-- <goal>single</goal>-->
+ <!-- </goals>-->
+ <!-- <configuration>-->
+ <!-- <descriptors>-->
+ <!-- <descriptor>src/assembly/mpp-share.xml</descriptor>-->
+ <!-- </descriptors>-->
+ <!-- <finalName>template-node-share</finalName>-->
+ <!-- <appendAssemblyId>false</appendAssemblyId>-->
+ <!-- </configuration>-->
+ <!-- </execution>-->
+ <!-- </executions>-->
+ <!-- </plugin>-->
<!-- Run integration tests -->
-<!-- <plugin>-->
-<!-- <groupId>org.apache.maven.plugins</groupId>-->
-<!-- <artifactId>maven-failsafe-plugin</artifactId>-->
-<!-- <executions>-->
-<!-- <execution>-->
-<!-- <id>integration-test</id>-->
-<!-- <goals>-->
-<!-- <goal>integration-test</goal>-->
-<!-- </goals>-->
-<!-- <configuration>-->
-<!-- <groups>${integrationTest.includedGroups}</groups>-->
-<!-- <excludedGroups>${integrationTest.excludedGroups}</excludedGroups>-->
-<!-- <useSystemClassLoader>false</useSystemClassLoader>-->
-<!-- <parallel>${integrationTest.parallelMode}</parallel>-->
-<!-- <threadCount>1</threadCount>-->
-<!-- <forkCount>${integrationTest.forkCount}</forkCount>-->
-<!-- <reuseForks>false</reuseForks>-->
-<!-- <systemPropertyVariables>-->
-<!-- <TestEnv>${integrationTest.testEnv}</TestEnv>-->
-<!-- <RandomSelectWriteNode>${integrationTest.randomSelectWriteNode}</RandomSelectWriteNode>-->
-<!-- <ReadAndVerifyWithMultiNode>${integrationTest.readAndVerifyWithMultiNode}</ReadAndVerifyWithMultiNode>-->
-<!-- </systemPropertyVariables>-->
-<!-- <summaryFile>target/failsafe-reports/failsafe-summary-IT.xml</summaryFile>-->
-<!-- </configuration>-->
-<!-- </execution>-->
-<!-- <execution>-->
-<!-- <id>verify</id>-->
-<!-- <goals>-->
-<!-- <goal>verify</goal>-->
-<!-- </goals>-->
-<!-- <configuration>-->
-<!-- <skipTests>true</skipTests>-->
-<!-- </configuration>-->
-<!-- </execution>-->
-<!-- </executions>-->
-<!-- </plugin>-->
-<!-- <plugin>-->
-<!-- <groupId>org.codehaus.mojo</groupId>-->
-<!-- <artifactId>exec-maven-plugin</artifactId>-->
-<!-- <version>1.6.0</version>-->
-<!-- <configuration>-->
-<!-- <mainClass>org.apache.iotdb.it.framework.IoTDBTestReporter</mainClass>-->
-<!-- </configuration>-->
-<!-- <executions>-->
-<!-- <execution>-->
-<!-- <id>report</id>-->
-<!-- <phase>post-integration-test</phase>-->
-<!-- <goals>-->
-<!-- <goal>java</goal>-->
-<!-- </goals>-->
-<!-- </execution>-->
-<!-- </executions>-->
-<!-- </plugin>-->
+ <!-- <plugin>-->
+ <!-- <groupId>org.apache.maven.plugins</groupId>-->
+ <!-- <artifactId>maven-failsafe-plugin</artifactId>-->
+ <!-- <executions>-->
+ <!-- <execution>-->
+ <!-- <id>integration-test</id>-->
+ <!-- <goals>-->
+ <!-- <goal>integration-test</goal>-->
+ <!-- </goals>-->
+ <!-- <configuration>-->
+ <!-- <groups>${integrationTest.includedGroups}</groups>-->
+ <!-- <excludedGroups>${integrationTest.excludedGroups}</excludedGroups>-->
+ <!-- <useSystemClassLoader>false</useSystemClassLoader>-->
+ <!-- <parallel>${integrationTest.parallelMode}</parallel>-->
+ <!-- <threadCount>1</threadCount>-->
+ <!-- <forkCount>${integrationTest.forkCount}</forkCount>-->
+ <!-- <reuseForks>false</reuseForks>-->
+ <!-- <systemPropertyVariables>-->
+ <!-- <TestEnv>${integrationTest.testEnv}</TestEnv>-->
+ <!-- <RandomSelectWriteNode>${integrationTest.randomSelectWriteNode}</RandomSelectWriteNode>-->
+ <!-- <ReadAndVerifyWithMultiNode>${integrationTest.readAndVerifyWithMultiNode}</ReadAndVerifyWithMultiNode>-->
+ <!-- </systemPropertyVariables>-->
+ <!-- <summaryFile>target/failsafe-reports/failsafe-summary-IT.xml</summaryFile>-->
+ <!-- </configuration>-->
+ <!-- </execution>-->
+ <!-- <execution>-->
+ <!-- <id>verify</id>-->
+ <!-- <goals>-->
+ <!-- <goal>verify</goal>-->
+ <!-- </goals>-->
+ <!-- <configuration>-->
+ <!-- <skipTests>true</skipTests>-->
+ <!-- </configuration>-->
+ <!-- </execution>-->
+ <!-- </executions>-->
+ <!-- </plugin>-->
+ <!-- <plugin>-->
+ <!-- <groupId>org.codehaus.mojo</groupId>-->
+ <!-- <artifactId>exec-maven-plugin</artifactId>-->
+ <!-- <version>1.6.0</version>-->
+ <!-- <configuration>-->
+ <!-- <mainClass>org.apache.iotdb.it.framework.IoTDBTestReporter</mainClass>-->
+ <!-- </configuration>-->
+ <!-- <executions>-->
+ <!-- <execution>-->
+ <!-- <id>report</id>-->
+ <!-- <phase>post-integration-test</phase>-->
+ <!-- <goals>-->
+ <!-- <goal>java</goal>-->
+ <!-- </goals>-->
+ <!-- </execution>-->
+ <!-- </executions>-->
+ <!-- </plugin>-->
</plugins>
</build>
<profiles>
diff --git a/pom.xml b/pom.xml
index ad884dbb06..afe1751ff1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1188,7 +1188,7 @@
</activation>
<properties>
<os.classifier>mac-x86_64</os.classifier>
- <thrift.download-url>https://github.com/apache/iotdb-bin-resources/raw/main/compile-tools/thrift-0.14-MacOS</thrift.download-url>
+ <thrift.download-url>https://gitbox.apache.org/repos/asf?p=iotdb-bin-resources.git;a=blob_plain;f=compile-tools/thrift-0.14-MacOS;hb=HEAD</thrift.download-url>
<thrift.executable>thrift_0.14.1_mac.exe</thrift.executable>
<thrift.skip-making-executable>false</thrift.skip-making-executable>
<thrift.exec-cmd.executable>chmod</thrift.exec-cmd.executable>
diff --git a/session/src/main/java/org/apache/iotdb/session/ISession.java b/session/src/main/java/org/apache/iotdb/session/ISession.java
index 3ea579b5c1..c9730c58c5 100644
--- a/session/src/main/java/org/apache/iotdb/session/ISession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ISession.java
@@ -53,6 +53,12 @@ public interface ISession extends AutoCloseable {
void open(boolean enableRPCCompression, int connectionTimeoutInMs)
throws IoTDBConnectionException;
+ void open(
+ boolean enableRPCCompression,
+ int connectionTimeoutInMs,
+ Map<String, TEndPoint> deviceIdToEndpoint)
+ throws IoTDBConnectionException;
+
void close() throws IoTDBConnectionException;
SessionConnection constructSessionConnection(Session session, TEndPoint endpoint, ZoneId zoneId)
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index a8c7872ff3..fd337d17b0 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -401,6 +401,28 @@ public class Session implements ISession {
}
}
+ @Override
+ public synchronized void open(
+ boolean enableRPCCompression,
+ int connectionTimeoutInMs,
+ Map<String, TEndPoint> deviceIdToEndpoint)
+ throws IoTDBConnectionException {
+ if (!isClosed) {
+ return;
+ }
+
+ this.enableRPCCompression = enableRPCCompression;
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
+ defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
+ defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+ isClosed = false;
+ if (enableRedirection || enableQueryRedirection) {
+ this.deviceIdToEndpoint = deviceIdToEndpoint;
+ endPointToSessionConnection = new ConcurrentHashMap<>();
+ endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
+ }
+ }
+
@Override
public synchronized void close() throws IoTDBConnectionException {
if (isClosed) {
@@ -949,6 +971,10 @@ public class Session implements ISession {
return;
}
AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>();
+ if (deviceIdToEndpoint.containsKey(deviceId)
+ && deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+ return;
+ }
deviceIdToEndpoint.put(deviceId, endpoint);
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
@@ -3197,7 +3223,7 @@ public class Session implements ISession {
* @param insertConsumer insert function
* @param <T>
* <ul>
- * <li>{@link TSInsertRecordsReq}
+ B * <li>{@link TSInsertRecordsReq}
* <li>{@link TSInsertStringRecordsReq}
* <li>{@link TSInsertTabletsReq}
* </ul>
@@ -3243,6 +3269,7 @@ public class Session implements ISession {
completableFuture.join();
} catch (CompletionException completionException) {
Throwable cause = completionException.getCause();
+ logger.error("Some error here!", cause);
if (cause instanceof IoTDBConnectionException) {
throw (IoTDBConnectionException) cause;
} else {
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index c4192b5b06..f079eeab95 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.session.pool;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -89,6 +90,8 @@ public class SessionPool {
private boolean enableRedirection;
private boolean enableQueryRedirection = false;
+ private volatile Map<String, TEndPoint> deviceIdToEndpoint;
+
private int thriftDefaultBufferSize;
private int thriftMaxFrameSize;
@@ -295,6 +298,9 @@ public class SessionPool {
this.enableCompression = enableCompression;
this.zoneId = zoneId;
this.enableRedirection = enableRedirection;
+ if (this.enableRedirection) {
+ deviceIdToEndpoint = new ConcurrentHashMap<>();
+ }
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.version = version;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -326,6 +332,9 @@ public class SessionPool {
this.enableCompression = enableCompression;
this.zoneId = zoneId;
this.enableRedirection = enableRedirection;
+ if (this.enableRedirection) {
+ deviceIdToEndpoint = new ConcurrentHashMap<>();
+ }
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.version = version;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -444,7 +453,7 @@ public class SessionPool {
session = constructNewSession();
try {
- session.open(enableCompression, connectionTimeoutInMs);
+ session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
@@ -540,7 +549,7 @@ public class SessionPool {
private void tryConstructNewSession() {
Session session = constructNewSession();
try {
- session.open(enableCompression, connectionTimeoutInMs);
+ session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
@@ -2545,6 +2554,9 @@ public class SessionPool {
public void setEnableRedirection(boolean enableRedirection) {
this.enableRedirection = enableRedirection;
+ if (this.enableRedirection) {
+ deviceIdToEndpoint = new ConcurrentHashMap<>();
+ }
for (Session session : queue) {
session.setEnableRedirection(enableRedirection);
}