You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/25 12:14:33 UTC

[iotdb] branch fast_write_test_with_guoneng updated: fix client concurrent issue

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_write_test_with_guoneng
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/fast_write_test_with_guoneng by this push:
     new a833d1ac7a fix client concurrent issue
a833d1ac7a is described below

commit a833d1ac7aca1b0632a7e0a3206798dbb86d0555
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Apr 25 20:14:20 2023 +0800

    fix client concurrent issue
---
 .../src/main/java/org/apache/iotdb/ReadTest.java   | 35 ++++++-----
 .../src/main/java/org/apache/iotdb/ReadTest2.java  | 68 ++++------------------
 .../apache/iotdb/{ReadTest.java => ReadTest3.java} | 58 +++++++++---------
 .../src/main/java/org/apache/iotdb/WriteTest.java  | 12 ++--
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |  4 --
 5 files changed, 61 insertions(+), 116 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/ReadTest.java b/example/session/src/main/java/org/apache/iotdb/ReadTest.java
index 90bed6b8cd..c88f90697d 100644
--- a/example/session/src/main/java/org/apache/iotdb/ReadTest.java
+++ b/example/session/src/main/java/org/apache/iotdb/ReadTest.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +31,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class ReadTest {
 
@@ -40,23 +38,20 @@ public class ReadTest {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ReadTest.class);
 
-  private static int THREAD_NUMBER = 100;
+  private static int THREAD_NUMBER = 300;
 
   private static int DEVICE_NUMBER = 20000;
 
-  private static int SENSOR_NUMBER = 500;
-
   private static int READ_LOOP = 10000000;
 
   private static long LOOP_INTERVAL_IN_NS = 3_000_000_000L;
-
-  private static List<String> measurements;
-
-  private static List<TSDataType> types;
-
-  private static AtomicInteger totalRowNumber = new AtomicInteger();
-
   private static Random r;
+  private static SyncReadSignal lastQuerySignal =
+      new SyncReadSignal(THREAD_NUMBER, "Last Value Query");
+  private static SyncReadSignal rawQuerySignal =
+      new SyncReadSignal(THREAD_NUMBER, "Raw Value Query");
+  private static SyncReadSignal avgQuerySignal =
+      new SyncReadSignal(THREAD_NUMBER, "AVG Query GROUP BY 5min");
 
   /** Build a custom SessionPool for this example */
 
@@ -94,6 +89,7 @@ public class ReadTest {
       if (needResetLatch) {
         synchronized (this) {
           if (needResetLatch) {
+            LOGGER.info("[{}] RESET SIGNAL", queryName);
             latch = new CountDownLatch(this.count);
             needResetLatch = false;
             totalCost = 0L;
@@ -135,7 +131,7 @@ public class ReadTest {
     r = new Random();
 
     // Run last query
-    SyncReadSignal lastQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Last Value Query");
+
     Thread[] lastReadThreads = new Thread[THREAD_NUMBER];
     for (int i = 0; i < THREAD_NUMBER; i++) {
       lastReadThreads[i] =
@@ -146,14 +142,15 @@ public class ReadTest {
                     throws IoTDBConnectionException, StatementExecutionException {
                   queryLastValue();
                 }
-              });
+              },
+              "lastValueQuery-" + i);
     }
     for (Thread thread : lastReadThreads) {
       thread.start();
     }
 
     // Run raw query
-    SyncReadSignal rawQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Raw Value Query");
+
     Thread[] rawReadThreads = new Thread[THREAD_NUMBER];
     for (int i = 0; i < THREAD_NUMBER; i++) {
       rawReadThreads[i] =
@@ -164,14 +161,15 @@ public class ReadTest {
                     throws IoTDBConnectionException, StatementExecutionException {
                   queryRawValue();
                 }
-              });
+              },
+              "rawValueQuery-" + i);
     }
     for (Thread thread : rawReadThreads) {
       thread.start();
     }
 
     // Run avg query
-    SyncReadSignal avgQuerySignal = new SyncReadSignal(THREAD_NUMBER, "AVG Query GROUP BY 5min");
+
     Thread[] avgReadThreads = new Thread[THREAD_NUMBER];
     for (int i = 0; i < THREAD_NUMBER; i++) {
       avgReadThreads[i] =
@@ -182,7 +180,8 @@ public class ReadTest {
                     throws IoTDBConnectionException, StatementExecutionException {
                   queryAvgValueGroupBy5Min();
                 }
-              });
+              },
+              "avgValueQuery-" + i);
     }
     for (Thread thread : avgReadThreads) {
       thread.start();
diff --git a/example/session/src/main/java/org/apache/iotdb/ReadTest2.java b/example/session/src/main/java/org/apache/iotdb/ReadTest2.java
index 4f5d6cece9..2d8e7670d9 100644
--- a/example/session/src/main/java/org/apache/iotdb/ReadTest2.java
+++ b/example/session/src/main/java/org/apache/iotdb/ReadTest2.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,7 +31,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class ReadTest2 {
 
@@ -39,22 +38,14 @@ public class ReadTest2 {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ReadTest2.class);
 
-  private static int THREAD_NUMBER = 100;
+  private static int THREAD_NUMBER = 1;
 
   private static int DEVICE_NUMBER = 20000;
 
-  private static int SENSOR_NUMBER = 500;
-
   private static int READ_LOOP = 10000000;
 
   private static long LOOP_INTERVAL_IN_NS = 3_000_000_000L;
 
-  private static List<String> measurements;
-
-  private static List<TSDataType> types;
-
-  private static AtomicInteger totalRowNumber = new AtomicInteger();
-
   private static Random r;
 
   /** Build a custom SessionPool for this example */
@@ -71,6 +62,7 @@ public class ReadTest2 {
             .nodeUrls(nodeUrls)
             .user("root")
             .password("root")
+            .timeOut(180000)
             .maxSize(500)
             .build();
     sessionPool.setFetchSize(10000);
@@ -133,45 +125,24 @@ public class ReadTest2 {
 
     r = new Random();
 
-    // Run raw query
-    SyncReadSignal rawQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Raw Value Query");
-    Thread[] rawReadThreads = new Thread[THREAD_NUMBER];
+    // Run last query
+    SyncReadSignal lastQuerySignal =
+        new SyncReadSignal(THREAD_NUMBER, "Last Value Query with 1000w");
+    Thread[] lastReadThreads = new Thread[THREAD_NUMBER];
     for (int i = 0; i < THREAD_NUMBER; i++) {
-      rawReadThreads[i] =
+      lastReadThreads[i] =
           new Thread(
-              new ReaderThread(rawQuerySignal) {
+              new ReaderThread(lastQuerySignal) {
                 @Override
                 protected void executeQuery()
                     throws IoTDBConnectionException, StatementExecutionException {
-                  queryRawValue();
+                  queryLastValue();
                 }
               });
     }
-    for (Thread thread : rawReadThreads) {
+    for (Thread thread : lastReadThreads) {
       thread.start();
     }
-
-    // Run avg query
-    SyncReadSignal avgQuerySignal = new SyncReadSignal(THREAD_NUMBER, "AVG Query GROUP BY 5min");
-    Thread[] avgReadThreads = new Thread[THREAD_NUMBER];
-    for (int i = 0; i < THREAD_NUMBER; i++) {
-      avgReadThreads[i] =
-          new Thread(
-              new ReaderThread(avgQuerySignal) {
-                @Override
-                protected void executeQuery()
-                    throws IoTDBConnectionException, StatementExecutionException {
-                  queryAvgValueGroupBy5Min();
-                }
-              });
-    }
-    for (Thread thread : avgReadThreads) {
-      thread.start();
-    }
-
-    for (Thread thread : avgReadThreads) {
-      thread.join();
-    }
   }
 
   private abstract static class ReaderThread implements Runnable {
@@ -209,22 +180,7 @@ public class ReadTest2 {
   private static void queryLastValue()
       throws IoTDBConnectionException, StatementExecutionException {
     int device = r.nextInt(DEVICE_NUMBER);
-    String sql = "select last(s_1) from root.test.g_0.d_" + device;
-    executeQuery(sql);
-  }
-
-  private static void queryRawValue() throws IoTDBConnectionException, StatementExecutionException {
-    int device = r.nextInt(DEVICE_NUMBER);
-    String sql = String.format("select s_1 from root.test.g_0.d_%s limit 1 offset 10", device);
-    executeQuery(sql);
-  }
-
-  private static void queryAvgValueGroupBy5Min()
-      throws IoTDBConnectionException, StatementExecutionException {
-    int device = r.nextInt(DEVICE_NUMBER);
-    String sql =
-        String.format(
-            "select avg(s_1) from root.test.g_0.d_%s GROUP BY ([now()-1d, now()), 5m)", device);
+    String sql = "select last(*) from root.test.**";
     executeQuery(sql);
   }
 
diff --git a/example/session/src/main/java/org/apache/iotdb/ReadTest.java b/example/session/src/main/java/org/apache/iotdb/ReadTest3.java
similarity index 87%
copy from example/session/src/main/java/org/apache/iotdb/ReadTest.java
copy to example/session/src/main/java/org/apache/iotdb/ReadTest3.java
index 90bed6b8cd..cb280588d2 100644
--- a/example/session/src/main/java/org/apache/iotdb/ReadTest.java
+++ b/example/session/src/main/java/org/apache/iotdb/ReadTest3.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,31 +31,27 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
 
-public class ReadTest {
+public class ReadTest3 {
 
   private static SessionPool sessionPool;
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(ReadTest.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReadTest3.class);
 
   private static int THREAD_NUMBER = 100;
 
   private static int DEVICE_NUMBER = 20000;
 
-  private static int SENSOR_NUMBER = 500;
-
   private static int READ_LOOP = 10000000;
 
   private static long LOOP_INTERVAL_IN_NS = 3_000_000_000L;
-
-  private static List<String> measurements;
-
-  private static List<TSDataType> types;
-
-  private static AtomicInteger totalRowNumber = new AtomicInteger();
-
   private static Random r;
+  private static SyncReadSignal lastQuerySignal =
+      new SyncReadSignal(THREAD_NUMBER, "Last Value Query");
+  private static SyncReadSignal rawQuerySignal =
+      new SyncReadSignal(THREAD_NUMBER, "Raw Value Query");
+  private static SyncReadSignal avgQuerySignal =
+      new SyncReadSignal(THREAD_NUMBER, "AVG Query GROUP BY 5min");
 
   /** Build a custom SessionPool for this example */
 
@@ -91,22 +86,20 @@ public class ReadTest {
     }
 
     protected void syncCountDownBeforeRead() {
-      if (needResetLatch) {
-        synchronized (this) {
-          if (needResetLatch) {
-            latch = new CountDownLatch(this.count);
-            needResetLatch = false;
-            totalCost = 0L;
-            currentTimestamp = System.nanoTime();
-          }
+      synchronized (this) {
+        if (needResetLatch) {
+          latch = new CountDownLatch(this.count);
+          needResetLatch = false;
+          totalCost = 0L;
+          currentTimestamp = System.nanoTime();
         }
       }
     }
 
     protected void finishReadAndWait(long cost, int loopIndex) throws InterruptedException {
       CountDownLatch currentLatch = latch;
-      totalCost += cost;
       synchronized (this) {
+        totalCost += cost;
         currentLatch.countDown();
         if (currentLatch.getCount() == 0) {
           needResetLatch = true;
@@ -135,7 +128,7 @@ public class ReadTest {
     r = new Random();
 
     // Run last query
-    SyncReadSignal lastQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Last Value Query");
+
     Thread[] lastReadThreads = new Thread[THREAD_NUMBER];
     for (int i = 0; i < THREAD_NUMBER; i++) {
       lastReadThreads[i] =
@@ -146,14 +139,15 @@ public class ReadTest {
                     throws IoTDBConnectionException, StatementExecutionException {
                   queryLastValue();
                 }
-              });
+              },
+              "lastValueQuery-" + i);
     }
     for (Thread thread : lastReadThreads) {
       thread.start();
     }
 
     // Run raw query
-    SyncReadSignal rawQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Raw Value Query");
+
     Thread[] rawReadThreads = new Thread[THREAD_NUMBER];
     for (int i = 0; i < THREAD_NUMBER; i++) {
       rawReadThreads[i] =
@@ -164,14 +158,15 @@ public class ReadTest {
                     throws IoTDBConnectionException, StatementExecutionException {
                   queryRawValue();
                 }
-              });
+              },
+              "rawValueQuery-" + i);
     }
     for (Thread thread : rawReadThreads) {
       thread.start();
     }
 
     // Run avg query
-    SyncReadSignal avgQuerySignal = new SyncReadSignal(THREAD_NUMBER, "AVG Query GROUP BY 5min");
+
     Thread[] avgReadThreads = new Thread[THREAD_NUMBER];
     for (int i = 0; i < THREAD_NUMBER; i++) {
       avgReadThreads[i] =
@@ -182,7 +177,8 @@ public class ReadTest {
                     throws IoTDBConnectionException, StatementExecutionException {
                   queryAvgValueGroupBy5Min();
                 }
-              });
+              },
+              "avgValueQuery-" + i);
     }
     for (Thread thread : avgReadThreads) {
       thread.start();
@@ -227,20 +223,20 @@ public class ReadTest {
 
   private static void queryLastValue()
       throws IoTDBConnectionException, StatementExecutionException {
-    int device = r.nextInt(DEVICE_NUMBER);
+    int device = r.nextInt(100);
     String sql = "select last(s_1) from root.test.g_0.d_" + device;
     executeQuery(sql);
   }
 
   private static void queryRawValue() throws IoTDBConnectionException, StatementExecutionException {
-    int device = r.nextInt(DEVICE_NUMBER);
+    int device = r.nextInt(100);
     String sql = String.format("select s_1 from root.test.g_0.d_%s limit 1 offset 10", device);
     executeQuery(sql);
   }
 
   private static void queryAvgValueGroupBy5Min()
       throws IoTDBConnectionException, StatementExecutionException {
-    int device = r.nextInt(DEVICE_NUMBER);
+    int device = r.nextInt(100);
     String sql =
         String.format(
             "select avg(s_1) from root.test.g_0.d_%s GROUP BY ([now()-1d, now()), 5m)", device);
diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTest.java b/example/session/src/main/java/org/apache/iotdb/WriteTest.java
index 7dc9c5029d..3f928fe515 100644
--- a/example/session/src/main/java/org/apache/iotdb/WriteTest.java
+++ b/example/session/src/main/java/org/apache/iotdb/WriteTest.java
@@ -88,13 +88,11 @@ public class WriteTest {
     }
 
     protected void syncCountDownBeforeInsert() {
-      if (needResetLatch) {
-        synchronized (this) {
-          if (needResetLatch) {
-            latch = new CountDownLatch(this.count);
-            needResetLatch = false;
-            currentTimestamp = System.currentTimeMillis();
-          }
+      synchronized (this) {
+        if (needResetLatch) {
+          latch = new CountDownLatch(this.count);
+          needResetLatch = false;
+          currentTimestamp = System.currentTimeMillis();
         }
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index b29538feaf..907a1a491b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -321,7 +321,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     // The schema cache R/W and fetch operation must be locked together thus the cache clean
     // operation executed by delete timeseries will be effective.
     schemaCache.takeReadLock();
-    templateSchemaCache.takeReadLock();
     try {
       List<ISchemaComputationWithAutoCreation> templateTimeSeriesRequestList = new ArrayList<>();
       List<Pair<Template, PartialPath>> templateSetInfoList = new ArrayList<>();
@@ -349,7 +348,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       }
     } finally {
       schemaCache.releaseReadLock();
-      templateSchemaCache.releaseReadLock();
     }
   }
 
@@ -357,7 +355,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
   public void fetchAndComputeSchemaWithAutoCreateForFastWrite(
       ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {
     schemaCache.takeReadLock();
-    templateSchemaCache.takeReadLock();
     try {
 
       Pair<Template, PartialPath> templateSetInfo =
@@ -389,7 +386,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       }
     } finally {
       schemaCache.releaseReadLock();
-      templateSchemaCache.releaseReadLock();
     }
   }