You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/02/07 03:42:39 UTC

[iotdb] 01/01: Fix client oom v1.0.0

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_client_oom_v1.0.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 35bf5d6d11502c988cfaeacb6a4d0c9a5b650eaa
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Feb 7 11:42:22 2023 +0800

    Fix client oom v1.0.0
---
 integration-test/pom.xml                           | 210 ++++++++++-----------
 pom.xml                                            |   2 +-
 .../java/org/apache/iotdb/session/ISession.java    |   6 +
 .../java/org/apache/iotdb/session/Session.java     |  29 ++-
 .../org/apache/iotdb/session/pool/SessionPool.java |  16 +-
 5 files changed, 154 insertions(+), 109 deletions(-)

diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 1b5addb22a..bae141d437 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -95,112 +95,112 @@
     <build>
         <plugins>
             <!-- skip default-test -->
-<!--            <plugin>-->
-<!--                <groupId>org.apache.maven.plugins</groupId>-->
-<!--                <artifactId>maven-surefire-plugin</artifactId>-->
-<!--                <executions>-->
-<!--                    <execution>-->
-<!--                        <id>default-test</id>-->
-<!--                        <configuration>-->
-<!--                            <skip>true</skip>-->
-<!--                        </configuration>-->
-<!--                    </execution>-->
-<!--                </executions>-->
-<!--            </plugin>-->
-<!--            &lt;!&ndash; If the test starts separate processes, we should package first &ndash;&gt;-->
-<!--            <plugin>-->
-<!--                <groupId>org.apache.maven.plugins</groupId>-->
-<!--                <artifactId>maven-assembly-plugin</artifactId>-->
-<!--                <version>${maven.assembly.version}</version>-->
-<!--                <configuration>-->
-<!--                    <skipAssembly>${integrationTest.launchNodeInSameJVM}</skipAssembly>-->
-<!--                </configuration>-->
-<!--                <executions>-->
-<!--                    &lt;!&ndash; Package binaries&ndash;&gt;-->
-<!--                    <execution>-->
-<!--                        <id>cluster-test-assembly</id>-->
-<!--                        <phase>package</phase>-->
-<!--                        <goals>-->
-<!--                            <goal>single</goal>-->
-<!--                        </goals>-->
-<!--                        <configuration>-->
-<!--                            <descriptors>-->
-<!--                                <descriptor>src/assembly/mpp-test.xml</descriptor>-->
-<!--                            </descriptors>-->
-<!--                            <finalName>template-node</finalName>-->
-<!--                            <appendAssemblyId>false</appendAssemblyId>-->
-<!--                        </configuration>-->
-<!--                    </execution>-->
-<!--                    <execution>-->
-<!--                        <id>cluster-test-assembly-share</id>-->
-<!--                        <phase>package</phase>-->
-<!--                        <goals>-->
-<!--                            <goal>single</goal>-->
-<!--                        </goals>-->
-<!--                        <configuration>-->
-<!--                            <descriptors>-->
-<!--                                <descriptor>src/assembly/mpp-share.xml</descriptor>-->
-<!--                            </descriptors>-->
-<!--                            <finalName>template-node-share</finalName>-->
-<!--                            <appendAssemblyId>false</appendAssemblyId>-->
-<!--                        </configuration>-->
-<!--                    </execution>-->
-<!--                </executions>-->
-<!--            </plugin>-->
+            <!--            <plugin>-->
+            <!--                <groupId>org.apache.maven.plugins</groupId>-->
+            <!--                <artifactId>maven-surefire-plugin</artifactId>-->
+            <!--                <executions>-->
+            <!--                    <execution>-->
+            <!--                        <id>default-test</id>-->
+            <!--                        <configuration>-->
+            <!--                            <skip>true</skip>-->
+            <!--                        </configuration>-->
+            <!--                    </execution>-->
+            <!--                </executions>-->
+            <!--            </plugin>-->
+            <!--            &lt;!&ndash; If the test starts separate processes, we should package first &ndash;&gt;-->
+            <!--            <plugin>-->
+            <!--                <groupId>org.apache.maven.plugins</groupId>-->
+            <!--                <artifactId>maven-assembly-plugin</artifactId>-->
+            <!--                <version>${maven.assembly.version}</version>-->
+            <!--                <configuration>-->
+            <!--                    <skipAssembly>${integrationTest.launchNodeInSameJVM}</skipAssembly>-->
+            <!--                </configuration>-->
+            <!--                <executions>-->
+            <!--                    &lt;!&ndash; Package binaries&ndash;&gt;-->
+            <!--                    <execution>-->
+            <!--                        <id>cluster-test-assembly</id>-->
+            <!--                        <phase>package</phase>-->
+            <!--                        <goals>-->
+            <!--                            <goal>single</goal>-->
+            <!--                        </goals>-->
+            <!--                        <configuration>-->
+            <!--                            <descriptors>-->
+            <!--                                <descriptor>src/assembly/mpp-test.xml</descriptor>-->
+            <!--                            </descriptors>-->
+            <!--                            <finalName>template-node</finalName>-->
+            <!--                            <appendAssemblyId>false</appendAssemblyId>-->
+            <!--                        </configuration>-->
+            <!--                    </execution>-->
+            <!--                    <execution>-->
+            <!--                        <id>cluster-test-assembly-share</id>-->
+            <!--                        <phase>package</phase>-->
+            <!--                        <goals>-->
+            <!--                            <goal>single</goal>-->
+            <!--                        </goals>-->
+            <!--                        <configuration>-->
+            <!--                            <descriptors>-->
+            <!--                                <descriptor>src/assembly/mpp-share.xml</descriptor>-->
+            <!--                            </descriptors>-->
+            <!--                            <finalName>template-node-share</finalName>-->
+            <!--                            <appendAssemblyId>false</appendAssemblyId>-->
+            <!--                        </configuration>-->
+            <!--                    </execution>-->
+            <!--                </executions>-->
+            <!--            </plugin>-->
             <!-- Run integration tests -->
-<!--            <plugin>-->
-<!--                <groupId>org.apache.maven.plugins</groupId>-->
-<!--                <artifactId>maven-failsafe-plugin</artifactId>-->
-<!--                <executions>-->
-<!--                    <execution>-->
-<!--                        <id>integration-test</id>-->
-<!--                        <goals>-->
-<!--                            <goal>integration-test</goal>-->
-<!--                        </goals>-->
-<!--                        <configuration>-->
-<!--                            <groups>${integrationTest.includedGroups}</groups>-->
-<!--                            <excludedGroups>${integrationTest.excludedGroups}</excludedGroups>-->
-<!--                            <useSystemClassLoader>false</useSystemClassLoader>-->
-<!--                            <parallel>${integrationTest.parallelMode}</parallel>-->
-<!--                            <threadCount>1</threadCount>-->
-<!--                            <forkCount>${integrationTest.forkCount}</forkCount>-->
-<!--                            <reuseForks>false</reuseForks>-->
-<!--                            <systemPropertyVariables>-->
-<!--                                <TestEnv>${integrationTest.testEnv}</TestEnv>-->
-<!--                                <RandomSelectWriteNode>${integrationTest.randomSelectWriteNode}</RandomSelectWriteNode>-->
-<!--                                <ReadAndVerifyWithMultiNode>${integrationTest.readAndVerifyWithMultiNode}</ReadAndVerifyWithMultiNode>-->
-<!--                            </systemPropertyVariables>-->
-<!--                            <summaryFile>target/failsafe-reports/failsafe-summary-IT.xml</summaryFile>-->
-<!--                        </configuration>-->
-<!--                    </execution>-->
-<!--                    <execution>-->
-<!--                        <id>verify</id>-->
-<!--                        <goals>-->
-<!--                            <goal>verify</goal>-->
-<!--                        </goals>-->
-<!--                        <configuration>-->
-<!--                            <skipTests>true</skipTests>-->
-<!--                        </configuration>-->
-<!--                    </execution>-->
-<!--                </executions>-->
-<!--            </plugin>-->
-<!--            <plugin>-->
-<!--                <groupId>org.codehaus.mojo</groupId>-->
-<!--                <artifactId>exec-maven-plugin</artifactId>-->
-<!--                <version>1.6.0</version>-->
-<!--                <configuration>-->
-<!--                    <mainClass>org.apache.iotdb.it.framework.IoTDBTestReporter</mainClass>-->
-<!--                </configuration>-->
-<!--                <executions>-->
-<!--                    <execution>-->
-<!--                        <id>report</id>-->
-<!--                        <phase>post-integration-test</phase>-->
-<!--                        <goals>-->
-<!--                            <goal>java</goal>-->
-<!--                        </goals>-->
-<!--                    </execution>-->
-<!--                </executions>-->
-<!--            </plugin>-->
+            <!--            <plugin>-->
+            <!--                <groupId>org.apache.maven.plugins</groupId>-->
+            <!--                <artifactId>maven-failsafe-plugin</artifactId>-->
+            <!--                <executions>-->
+            <!--                    <execution>-->
+            <!--                        <id>integration-test</id>-->
+            <!--                        <goals>-->
+            <!--                            <goal>integration-test</goal>-->
+            <!--                        </goals>-->
+            <!--                        <configuration>-->
+            <!--                            <groups>${integrationTest.includedGroups}</groups>-->
+            <!--                            <excludedGroups>${integrationTest.excludedGroups}</excludedGroups>-->
+            <!--                            <useSystemClassLoader>false</useSystemClassLoader>-->
+            <!--                            <parallel>${integrationTest.parallelMode}</parallel>-->
+            <!--                            <threadCount>1</threadCount>-->
+            <!--                            <forkCount>${integrationTest.forkCount}</forkCount>-->
+            <!--                            <reuseForks>false</reuseForks>-->
+            <!--                            <systemPropertyVariables>-->
+            <!--                                <TestEnv>${integrationTest.testEnv}</TestEnv>-->
+            <!--                                <RandomSelectWriteNode>${integrationTest.randomSelectWriteNode}</RandomSelectWriteNode>-->
+            <!--                                <ReadAndVerifyWithMultiNode>${integrationTest.readAndVerifyWithMultiNode}</ReadAndVerifyWithMultiNode>-->
+            <!--                            </systemPropertyVariables>-->
+            <!--                            <summaryFile>target/failsafe-reports/failsafe-summary-IT.xml</summaryFile>-->
+            <!--                        </configuration>-->
+            <!--                    </execution>-->
+            <!--                    <execution>-->
+            <!--                        <id>verify</id>-->
+            <!--                        <goals>-->
+            <!--                            <goal>verify</goal>-->
+            <!--                        </goals>-->
+            <!--                        <configuration>-->
+            <!--                            <skipTests>true</skipTests>-->
+            <!--                        </configuration>-->
+            <!--                    </execution>-->
+            <!--                </executions>-->
+            <!--            </plugin>-->
+            <!--            <plugin>-->
+            <!--                <groupId>org.codehaus.mojo</groupId>-->
+            <!--                <artifactId>exec-maven-plugin</artifactId>-->
+            <!--                <version>1.6.0</version>-->
+            <!--                <configuration>-->
+            <!--                    <mainClass>org.apache.iotdb.it.framework.IoTDBTestReporter</mainClass>-->
+            <!--                </configuration>-->
+            <!--                <executions>-->
+            <!--                    <execution>-->
+            <!--                        <id>report</id>-->
+            <!--                        <phase>post-integration-test</phase>-->
+            <!--                        <goals>-->
+            <!--                            <goal>java</goal>-->
+            <!--                        </goals>-->
+            <!--                    </execution>-->
+            <!--                </executions>-->
+            <!--            </plugin>-->
         </plugins>
     </build>
     <profiles>
diff --git a/pom.xml b/pom.xml
index ad884dbb06..afe1751ff1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1188,7 +1188,7 @@
             </activation>
             <properties>
                 <os.classifier>mac-x86_64</os.classifier>
-                <thrift.download-url>https://github.com/apache/iotdb-bin-resources/raw/main/compile-tools/thrift-0.14-MacOS</thrift.download-url>
+                <thrift.download-url>https://gitbox.apache.org/repos/asf?p=iotdb-bin-resources.git;a=blob_plain;f=compile-tools/thrift-0.14-MacOS;hb=HEAD</thrift.download-url>
                 <thrift.executable>thrift_0.14.1_mac.exe</thrift.executable>
                 <thrift.skip-making-executable>false</thrift.skip-making-executable>
                 <thrift.exec-cmd.executable>chmod</thrift.exec-cmd.executable>
diff --git a/session/src/main/java/org/apache/iotdb/session/ISession.java b/session/src/main/java/org/apache/iotdb/session/ISession.java
index 3ea579b5c1..c9730c58c5 100644
--- a/session/src/main/java/org/apache/iotdb/session/ISession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ISession.java
@@ -53,6 +53,12 @@ public interface ISession extends AutoCloseable {
   void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBConnectionException;
 
+  void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint)
+      throws IoTDBConnectionException;
+
   void close() throws IoTDBConnectionException;
 
   SessionConnection constructSessionConnection(Session session, TEndPoint endpoint, ZoneId zoneId)
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index a8c7872ff3..fd337d17b0 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -401,6 +401,28 @@ public class Session implements ISession {
     }
   }
 
+  @Override
+  public synchronized void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint)
+      throws IoTDBConnectionException {
+    if (!isClosed) {
+      return;
+    }
+
+    this.enableRPCCompression = enableRPCCompression;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
+    defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
+    defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+    isClosed = false;
+    if (enableRedirection || enableQueryRedirection) {
+      this.deviceIdToEndpoint = deviceIdToEndpoint;
+      endPointToSessionConnection = new ConcurrentHashMap<>();
+      endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
+    }
+  }
+
   @Override
   public synchronized void close() throws IoTDBConnectionException {
     if (isClosed) {
@@ -949,6 +971,10 @@ public class Session implements ISession {
         return;
       }
       AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>();
+      if (deviceIdToEndpoint.containsKey(deviceId)
+          && deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+        return;
+      }
       deviceIdToEndpoint.put(deviceId, endpoint);
       SessionConnection connection =
           endPointToSessionConnection.computeIfAbsent(
@@ -3197,7 +3223,7 @@ public class Session implements ISession {
    * @param insertConsumer insert function
    * @param <T>
    *     <ul>
-   *       <li>{@link TSInsertRecordsReq}
+ B  *       <li>{@link TSInsertRecordsReq}
    *       <li>{@link TSInsertStringRecordsReq}
    *       <li>{@link TSInsertTabletsReq}
    *     </ul>
@@ -3243,6 +3269,7 @@ public class Session implements ISession {
         completableFuture.join();
       } catch (CompletionException completionException) {
         Throwable cause = completionException.getCause();
+        logger.error("Some error here!", cause);
         if (cause instanceof IoTDBConnectionException) {
           throw (IoTDBConnectionException) cause;
         } else {
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index c4192b5b06..f079eeab95 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.session.pool;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -89,6 +90,8 @@ public class SessionPool {
   private boolean enableRedirection;
   private boolean enableQueryRedirection = false;
 
+  private volatile Map<String, TEndPoint> deviceIdToEndpoint;
+
   private int thriftDefaultBufferSize;
   private int thriftMaxFrameSize;
 
@@ -295,6 +298,9 @@ public class SessionPool {
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -326,6 +332,9 @@ public class SessionPool {
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -444,7 +453,7 @@ public class SessionPool {
       session = constructNewSession();
 
       try {
-        session.open(enableCompression, connectionTimeoutInMs);
+        session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
         // avoid someone has called close() the session pool
         synchronized (this) {
           if (closed) {
@@ -540,7 +549,7 @@ public class SessionPool {
   private void tryConstructNewSession() {
     Session session = constructNewSession();
     try {
-      session.open(enableCompression, connectionTimeoutInMs);
+      session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
       // avoid someone has called close() the session pool
       synchronized (this) {
         if (closed) {
@@ -2545,6 +2554,9 @@ public class SessionPool {
 
   public void setEnableRedirection(boolean enableRedirection) {
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     for (Session session : queue) {
       session.setEnableRedirection(enableRedirection);
     }