You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2023/02/07 03:42:38 UTC

[iotdb] branch fix_client_oom_v1.0.0 created (now 35bf5d6d11)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch fix_client_oom_v1.0.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 35bf5d6d11 Fix client oom v1.0.0

This branch includes the following new commits:

     new 35bf5d6d11 Fix client oom v1.0.0

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Fix client oom v1.0.0

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch fix_client_oom_v1.0.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 35bf5d6d11502c988cfaeacb6a4d0c9a5b650eaa
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Feb 7 11:42:22 2023 +0800

    Fix client oom v1.0.0
---
 integration-test/pom.xml                           | 210 ++++++++++-----------
 pom.xml                                            |   2 +-
 .../java/org/apache/iotdb/session/ISession.java    |   6 +
 .../java/org/apache/iotdb/session/Session.java     |  29 ++-
 .../org/apache/iotdb/session/pool/SessionPool.java |  16 +-
 5 files changed, 154 insertions(+), 109 deletions(-)

diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 1b5addb22a..bae141d437 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -95,112 +95,112 @@
     <build>
         <plugins>
             <!-- skip default-test -->
-<!--            <plugin>-->
-<!--                <groupId>org.apache.maven.plugins</groupId>-->
-<!--                <artifactId>maven-surefire-plugin</artifactId>-->
-<!--                <executions>-->
-<!--                    <execution>-->
-<!--                        <id>default-test</id>-->
-<!--                        <configuration>-->
-<!--                            <skip>true</skip>-->
-<!--                        </configuration>-->
-<!--                    </execution>-->
-<!--                </executions>-->
-<!--            </plugin>-->
-<!--            &lt;!&ndash; If the test starts separate processes, we should package first &ndash;&gt;-->
-<!--            <plugin>-->
-<!--                <groupId>org.apache.maven.plugins</groupId>-->
-<!--                <artifactId>maven-assembly-plugin</artifactId>-->
-<!--                <version>${maven.assembly.version}</version>-->
-<!--                <configuration>-->
-<!--                    <skipAssembly>${integrationTest.launchNodeInSameJVM}</skipAssembly>-->
-<!--                </configuration>-->
-<!--                <executions>-->
-<!--                    &lt;!&ndash; Package binaries&ndash;&gt;-->
-<!--                    <execution>-->
-<!--                        <id>cluster-test-assembly</id>-->
-<!--                        <phase>package</phase>-->
-<!--                        <goals>-->
-<!--                            <goal>single</goal>-->
-<!--                        </goals>-->
-<!--                        <configuration>-->
-<!--                            <descriptors>-->
-<!--                                <descriptor>src/assembly/mpp-test.xml</descriptor>-->
-<!--                            </descriptors>-->
-<!--                            <finalName>template-node</finalName>-->
-<!--                            <appendAssemblyId>false</appendAssemblyId>-->
-<!--                        </configuration>-->
-<!--                    </execution>-->
-<!--                    <execution>-->
-<!--                        <id>cluster-test-assembly-share</id>-->
-<!--                        <phase>package</phase>-->
-<!--                        <goals>-->
-<!--                            <goal>single</goal>-->
-<!--                        </goals>-->
-<!--                        <configuration>-->
-<!--                            <descriptors>-->
-<!--                                <descriptor>src/assembly/mpp-share.xml</descriptor>-->
-<!--                            </descriptors>-->
-<!--                            <finalName>template-node-share</finalName>-->
-<!--                            <appendAssemblyId>false</appendAssemblyId>-->
-<!--                        </configuration>-->
-<!--                    </execution>-->
-<!--                </executions>-->
-<!--            </plugin>-->
+            <!--            <plugin>-->
+            <!--                <groupId>org.apache.maven.plugins</groupId>-->
+            <!--                <artifactId>maven-surefire-plugin</artifactId>-->
+            <!--                <executions>-->
+            <!--                    <execution>-->
+            <!--                        <id>default-test</id>-->
+            <!--                        <configuration>-->
+            <!--                            <skip>true</skip>-->
+            <!--                        </configuration>-->
+            <!--                    </execution>-->
+            <!--                </executions>-->
+            <!--            </plugin>-->
+            <!--            &lt;!&ndash; If the test starts separate processes, we should package first &ndash;&gt;-->
+            <!--            <plugin>-->
+            <!--                <groupId>org.apache.maven.plugins</groupId>-->
+            <!--                <artifactId>maven-assembly-plugin</artifactId>-->
+            <!--                <version>${maven.assembly.version}</version>-->
+            <!--                <configuration>-->
+            <!--                    <skipAssembly>${integrationTest.launchNodeInSameJVM}</skipAssembly>-->
+            <!--                </configuration>-->
+            <!--                <executions>-->
+            <!--                    &lt;!&ndash; Package binaries&ndash;&gt;-->
+            <!--                    <execution>-->
+            <!--                        <id>cluster-test-assembly</id>-->
+            <!--                        <phase>package</phase>-->
+            <!--                        <goals>-->
+            <!--                            <goal>single</goal>-->
+            <!--                        </goals>-->
+            <!--                        <configuration>-->
+            <!--                            <descriptors>-->
+            <!--                                <descriptor>src/assembly/mpp-test.xml</descriptor>-->
+            <!--                            </descriptors>-->
+            <!--                            <finalName>template-node</finalName>-->
+            <!--                            <appendAssemblyId>false</appendAssemblyId>-->
+            <!--                        </configuration>-->
+            <!--                    </execution>-->
+            <!--                    <execution>-->
+            <!--                        <id>cluster-test-assembly-share</id>-->
+            <!--                        <phase>package</phase>-->
+            <!--                        <goals>-->
+            <!--                            <goal>single</goal>-->
+            <!--                        </goals>-->
+            <!--                        <configuration>-->
+            <!--                            <descriptors>-->
+            <!--                                <descriptor>src/assembly/mpp-share.xml</descriptor>-->
+            <!--                            </descriptors>-->
+            <!--                            <finalName>template-node-share</finalName>-->
+            <!--                            <appendAssemblyId>false</appendAssemblyId>-->
+            <!--                        </configuration>-->
+            <!--                    </execution>-->
+            <!--                </executions>-->
+            <!--            </plugin>-->
             <!-- Run integration tests -->
-<!--            <plugin>-->
-<!--                <groupId>org.apache.maven.plugins</groupId>-->
-<!--                <artifactId>maven-failsafe-plugin</artifactId>-->
-<!--                <executions>-->
-<!--                    <execution>-->
-<!--                        <id>integration-test</id>-->
-<!--                        <goals>-->
-<!--                            <goal>integration-test</goal>-->
-<!--                        </goals>-->
-<!--                        <configuration>-->
-<!--                            <groups>${integrationTest.includedGroups}</groups>-->
-<!--                            <excludedGroups>${integrationTest.excludedGroups}</excludedGroups>-->
-<!--                            <useSystemClassLoader>false</useSystemClassLoader>-->
-<!--                            <parallel>${integrationTest.parallelMode}</parallel>-->
-<!--                            <threadCount>1</threadCount>-->
-<!--                            <forkCount>${integrationTest.forkCount}</forkCount>-->
-<!--                            <reuseForks>false</reuseForks>-->
-<!--                            <systemPropertyVariables>-->
-<!--                                <TestEnv>${integrationTest.testEnv}</TestEnv>-->
-<!--                                <RandomSelectWriteNode>${integrationTest.randomSelectWriteNode}</RandomSelectWriteNode>-->
-<!--                                <ReadAndVerifyWithMultiNode>${integrationTest.readAndVerifyWithMultiNode}</ReadAndVerifyWithMultiNode>-->
-<!--                            </systemPropertyVariables>-->
-<!--                            <summaryFile>target/failsafe-reports/failsafe-summary-IT.xml</summaryFile>-->
-<!--                        </configuration>-->
-<!--                    </execution>-->
-<!--                    <execution>-->
-<!--                        <id>verify</id>-->
-<!--                        <goals>-->
-<!--                            <goal>verify</goal>-->
-<!--                        </goals>-->
-<!--                        <configuration>-->
-<!--                            <skipTests>true</skipTests>-->
-<!--                        </configuration>-->
-<!--                    </execution>-->
-<!--                </executions>-->
-<!--            </plugin>-->
-<!--            <plugin>-->
-<!--                <groupId>org.codehaus.mojo</groupId>-->
-<!--                <artifactId>exec-maven-plugin</artifactId>-->
-<!--                <version>1.6.0</version>-->
-<!--                <configuration>-->
-<!--                    <mainClass>org.apache.iotdb.it.framework.IoTDBTestReporter</mainClass>-->
-<!--                </configuration>-->
-<!--                <executions>-->
-<!--                    <execution>-->
-<!--                        <id>report</id>-->
-<!--                        <phase>post-integration-test</phase>-->
-<!--                        <goals>-->
-<!--                            <goal>java</goal>-->
-<!--                        </goals>-->
-<!--                    </execution>-->
-<!--                </executions>-->
-<!--            </plugin>-->
+            <!--            <plugin>-->
+            <!--                <groupId>org.apache.maven.plugins</groupId>-->
+            <!--                <artifactId>maven-failsafe-plugin</artifactId>-->
+            <!--                <executions>-->
+            <!--                    <execution>-->
+            <!--                        <id>integration-test</id>-->
+            <!--                        <goals>-->
+            <!--                            <goal>integration-test</goal>-->
+            <!--                        </goals>-->
+            <!--                        <configuration>-->
+            <!--                            <groups>${integrationTest.includedGroups}</groups>-->
+            <!--                            <excludedGroups>${integrationTest.excludedGroups}</excludedGroups>-->
+            <!--                            <useSystemClassLoader>false</useSystemClassLoader>-->
+            <!--                            <parallel>${integrationTest.parallelMode}</parallel>-->
+            <!--                            <threadCount>1</threadCount>-->
+            <!--                            <forkCount>${integrationTest.forkCount}</forkCount>-->
+            <!--                            <reuseForks>false</reuseForks>-->
+            <!--                            <systemPropertyVariables>-->
+            <!--                                <TestEnv>${integrationTest.testEnv}</TestEnv>-->
+            <!--                                <RandomSelectWriteNode>${integrationTest.randomSelectWriteNode}</RandomSelectWriteNode>-->
+            <!--                                <ReadAndVerifyWithMultiNode>${integrationTest.readAndVerifyWithMultiNode}</ReadAndVerifyWithMultiNode>-->
+            <!--                            </systemPropertyVariables>-->
+            <!--                            <summaryFile>target/failsafe-reports/failsafe-summary-IT.xml</summaryFile>-->
+            <!--                        </configuration>-->
+            <!--                    </execution>-->
+            <!--                    <execution>-->
+            <!--                        <id>verify</id>-->
+            <!--                        <goals>-->
+            <!--                            <goal>verify</goal>-->
+            <!--                        </goals>-->
+            <!--                        <configuration>-->
+            <!--                            <skipTests>true</skipTests>-->
+            <!--                        </configuration>-->
+            <!--                    </execution>-->
+            <!--                </executions>-->
+            <!--            </plugin>-->
+            <!--            <plugin>-->
+            <!--                <groupId>org.codehaus.mojo</groupId>-->
+            <!--                <artifactId>exec-maven-plugin</artifactId>-->
+            <!--                <version>1.6.0</version>-->
+            <!--                <configuration>-->
+            <!--                    <mainClass>org.apache.iotdb.it.framework.IoTDBTestReporter</mainClass>-->
+            <!--                </configuration>-->
+            <!--                <executions>-->
+            <!--                    <execution>-->
+            <!--                        <id>report</id>-->
+            <!--                        <phase>post-integration-test</phase>-->
+            <!--                        <goals>-->
+            <!--                            <goal>java</goal>-->
+            <!--                        </goals>-->
+            <!--                    </execution>-->
+            <!--                </executions>-->
+            <!--            </plugin>-->
         </plugins>
     </build>
     <profiles>
diff --git a/pom.xml b/pom.xml
index ad884dbb06..afe1751ff1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1188,7 +1188,7 @@
             </activation>
             <properties>
                 <os.classifier>mac-x86_64</os.classifier>
-                <thrift.download-url>https://github.com/apache/iotdb-bin-resources/raw/main/compile-tools/thrift-0.14-MacOS</thrift.download-url>
+                <thrift.download-url>https://gitbox.apache.org/repos/asf?p=iotdb-bin-resources.git;a=blob_plain;f=compile-tools/thrift-0.14-MacOS;hb=HEAD</thrift.download-url>
                 <thrift.executable>thrift_0.14.1_mac.exe</thrift.executable>
                 <thrift.skip-making-executable>false</thrift.skip-making-executable>
                 <thrift.exec-cmd.executable>chmod</thrift.exec-cmd.executable>
diff --git a/session/src/main/java/org/apache/iotdb/session/ISession.java b/session/src/main/java/org/apache/iotdb/session/ISession.java
index 3ea579b5c1..c9730c58c5 100644
--- a/session/src/main/java/org/apache/iotdb/session/ISession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ISession.java
@@ -53,6 +53,12 @@ public interface ISession extends AutoCloseable {
   void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBConnectionException;
 
+  void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint)
+      throws IoTDBConnectionException;
+
   void close() throws IoTDBConnectionException;
 
   SessionConnection constructSessionConnection(Session session, TEndPoint endpoint, ZoneId zoneId)
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index a8c7872ff3..fd337d17b0 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -401,6 +401,28 @@ public class Session implements ISession {
     }
   }
 
+  @Override
+  public synchronized void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint)
+      throws IoTDBConnectionException {
+    if (!isClosed) {
+      return;
+    }
+
+    this.enableRPCCompression = enableRPCCompression;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
+    defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
+    defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+    isClosed = false;
+    if (enableRedirection || enableQueryRedirection) {
+      this.deviceIdToEndpoint = deviceIdToEndpoint;
+      endPointToSessionConnection = new ConcurrentHashMap<>();
+      endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
+    }
+  }
+
   @Override
   public synchronized void close() throws IoTDBConnectionException {
     if (isClosed) {
@@ -949,6 +971,10 @@ public class Session implements ISession {
         return;
       }
       AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>();
+      if (deviceIdToEndpoint.containsKey(deviceId)
+          && deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+        return;
+      }
       deviceIdToEndpoint.put(deviceId, endpoint);
       SessionConnection connection =
           endPointToSessionConnection.computeIfAbsent(
@@ -3197,7 +3223,7 @@ public class Session implements ISession {
    * @param insertConsumer insert function
    * @param <T>
    *     <ul>
-   *       <li>{@link TSInsertRecordsReq}
+ B  *       <li>{@link TSInsertRecordsReq}
    *       <li>{@link TSInsertStringRecordsReq}
    *       <li>{@link TSInsertTabletsReq}
    *     </ul>
@@ -3243,6 +3269,7 @@ public class Session implements ISession {
         completableFuture.join();
       } catch (CompletionException completionException) {
         Throwable cause = completionException.getCause();
+        logger.error("Some error here!", cause);
         if (cause instanceof IoTDBConnectionException) {
           throw (IoTDBConnectionException) cause;
         } else {
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index c4192b5b06..f079eeab95 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.session.pool;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -89,6 +90,8 @@ public class SessionPool {
   private boolean enableRedirection;
   private boolean enableQueryRedirection = false;
 
+  private volatile Map<String, TEndPoint> deviceIdToEndpoint;
+
   private int thriftDefaultBufferSize;
   private int thriftMaxFrameSize;
 
@@ -295,6 +298,9 @@ public class SessionPool {
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -326,6 +332,9 @@ public class SessionPool {
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -444,7 +453,7 @@ public class SessionPool {
       session = constructNewSession();
 
       try {
-        session.open(enableCompression, connectionTimeoutInMs);
+        session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
         // avoid someone has called close() the session pool
         synchronized (this) {
           if (closed) {
@@ -540,7 +549,7 @@ public class SessionPool {
   private void tryConstructNewSession() {
     Session session = constructNewSession();
     try {
-      session.open(enableCompression, connectionTimeoutInMs);
+      session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
       // avoid someone has called close() the session pool
       synchronized (this) {
         if (closed) {
@@ -2545,6 +2554,9 @@ public class SessionPool {
 
   public void setEnableRedirection(boolean enableRedirection) {
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     for (Session session : queue) {
       session.setEnableRedirection(enableRedirection);
     }