You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/25 12:14:33 UTC
[iotdb] branch fast_write_test_with_guoneng updated: fix client concurrent issue
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch fast_write_test_with_guoneng
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/fast_write_test_with_guoneng by this push:
new a833d1ac7a fix client concurrent issue
a833d1ac7a is described below
commit a833d1ac7aca1b0632a7e0a3206798dbb86d0555
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Apr 25 20:14:20 2023 +0800
fix client concurrent issue
---
.../src/main/java/org/apache/iotdb/ReadTest.java | 35 ++++++-----
.../src/main/java/org/apache/iotdb/ReadTest2.java | 68 ++++------------------
.../apache/iotdb/{ReadTest.java => ReadTest3.java} | 58 +++++++++---------
.../src/main/java/org/apache/iotdb/WriteTest.java | 12 ++--
.../plan/analyze/schema/ClusterSchemaFetcher.java | 4 --
5 files changed, 61 insertions(+), 116 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/ReadTest.java b/example/session/src/main/java/org/apache/iotdb/ReadTest.java
index 90bed6b8cd..c88f90697d 100644
--- a/example/session/src/main/java/org/apache/iotdb/ReadTest.java
+++ b/example/session/src/main/java/org/apache/iotdb/ReadTest.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +31,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
public class ReadTest {
@@ -40,23 +38,20 @@ public class ReadTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadTest.class);
- private static int THREAD_NUMBER = 100;
+ private static int THREAD_NUMBER = 300;
private static int DEVICE_NUMBER = 20000;
- private static int SENSOR_NUMBER = 500;
-
private static int READ_LOOP = 10000000;
private static long LOOP_INTERVAL_IN_NS = 3_000_000_000L;
-
- private static List<String> measurements;
-
- private static List<TSDataType> types;
-
- private static AtomicInteger totalRowNumber = new AtomicInteger();
-
private static Random r;
+ private static SyncReadSignal lastQuerySignal =
+ new SyncReadSignal(THREAD_NUMBER, "Last Value Query");
+ private static SyncReadSignal rawQuerySignal =
+ new SyncReadSignal(THREAD_NUMBER, "Raw Value Query");
+ private static SyncReadSignal avgQuerySignal =
+ new SyncReadSignal(THREAD_NUMBER, "AVG Query GROUP BY 5min");
/** Build a custom SessionPool for this example */
@@ -94,6 +89,7 @@ public class ReadTest {
if (needResetLatch) {
synchronized (this) {
if (needResetLatch) {
+ LOGGER.info("[{}] RESET SIGNAL", queryName);
latch = new CountDownLatch(this.count);
needResetLatch = false;
totalCost = 0L;
@@ -135,7 +131,7 @@ public class ReadTest {
r = new Random();
// Run last query
- SyncReadSignal lastQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Last Value Query");
+
Thread[] lastReadThreads = new Thread[THREAD_NUMBER];
for (int i = 0; i < THREAD_NUMBER; i++) {
lastReadThreads[i] =
@@ -146,14 +142,15 @@ public class ReadTest {
throws IoTDBConnectionException, StatementExecutionException {
queryLastValue();
}
- });
+ },
+ "lastValueQuery-" + i);
}
for (Thread thread : lastReadThreads) {
thread.start();
}
// Run raw query
- SyncReadSignal rawQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Raw Value Query");
+
Thread[] rawReadThreads = new Thread[THREAD_NUMBER];
for (int i = 0; i < THREAD_NUMBER; i++) {
rawReadThreads[i] =
@@ -164,14 +161,15 @@ public class ReadTest {
throws IoTDBConnectionException, StatementExecutionException {
queryRawValue();
}
- });
+ },
+ "rawValueQuery-" + i);
}
for (Thread thread : rawReadThreads) {
thread.start();
}
// Run avg query
- SyncReadSignal avgQuerySignal = new SyncReadSignal(THREAD_NUMBER, "AVG Query GROUP BY 5min");
+
Thread[] avgReadThreads = new Thread[THREAD_NUMBER];
for (int i = 0; i < THREAD_NUMBER; i++) {
avgReadThreads[i] =
@@ -182,7 +180,8 @@ public class ReadTest {
throws IoTDBConnectionException, StatementExecutionException {
queryAvgValueGroupBy5Min();
}
- });
+ },
+ "avgValueQuery-" + i);
}
for (Thread thread : avgReadThreads) {
thread.start();
diff --git a/example/session/src/main/java/org/apache/iotdb/ReadTest2.java b/example/session/src/main/java/org/apache/iotdb/ReadTest2.java
index 4f5d6cece9..2d8e7670d9 100644
--- a/example/session/src/main/java/org/apache/iotdb/ReadTest2.java
+++ b/example/session/src/main/java/org/apache/iotdb/ReadTest2.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
public class ReadTest2 {
@@ -39,22 +38,14 @@ public class ReadTest2 {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadTest2.class);
- private static int THREAD_NUMBER = 100;
+ private static int THREAD_NUMBER = 1;
private static int DEVICE_NUMBER = 20000;
- private static int SENSOR_NUMBER = 500;
-
private static int READ_LOOP = 10000000;
private static long LOOP_INTERVAL_IN_NS = 3_000_000_000L;
- private static List<String> measurements;
-
- private static List<TSDataType> types;
-
- private static AtomicInteger totalRowNumber = new AtomicInteger();
-
private static Random r;
/** Build a custom SessionPool for this example */
@@ -71,6 +62,7 @@ public class ReadTest2 {
.nodeUrls(nodeUrls)
.user("root")
.password("root")
+ .timeOut(180000)
.maxSize(500)
.build();
sessionPool.setFetchSize(10000);
@@ -133,45 +125,24 @@ public class ReadTest2 {
r = new Random();
- // Run raw query
- SyncReadSignal rawQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Raw Value Query");
- Thread[] rawReadThreads = new Thread[THREAD_NUMBER];
+ // Run last query
+ SyncReadSignal lastQuerySignal =
+ new SyncReadSignal(THREAD_NUMBER, "Last Value Query with 1000w");
+ Thread[] lastReadThreads = new Thread[THREAD_NUMBER];
for (int i = 0; i < THREAD_NUMBER; i++) {
- rawReadThreads[i] =
+ lastReadThreads[i] =
new Thread(
- new ReaderThread(rawQuerySignal) {
+ new ReaderThread(lastQuerySignal) {
@Override
protected void executeQuery()
throws IoTDBConnectionException, StatementExecutionException {
- queryRawValue();
+ queryLastValue();
}
});
}
- for (Thread thread : rawReadThreads) {
+ for (Thread thread : lastReadThreads) {
thread.start();
}
-
- // Run avg query
- SyncReadSignal avgQuerySignal = new SyncReadSignal(THREAD_NUMBER, "AVG Query GROUP BY 5min");
- Thread[] avgReadThreads = new Thread[THREAD_NUMBER];
- for (int i = 0; i < THREAD_NUMBER; i++) {
- avgReadThreads[i] =
- new Thread(
- new ReaderThread(avgQuerySignal) {
- @Override
- protected void executeQuery()
- throws IoTDBConnectionException, StatementExecutionException {
- queryAvgValueGroupBy5Min();
- }
- });
- }
- for (Thread thread : avgReadThreads) {
- thread.start();
- }
-
- for (Thread thread : avgReadThreads) {
- thread.join();
- }
}
private abstract static class ReaderThread implements Runnable {
@@ -209,22 +180,7 @@ public class ReadTest2 {
private static void queryLastValue()
throws IoTDBConnectionException, StatementExecutionException {
int device = r.nextInt(DEVICE_NUMBER);
- String sql = "select last(s_1) from root.test.g_0.d_" + device;
- executeQuery(sql);
- }
-
- private static void queryRawValue() throws IoTDBConnectionException, StatementExecutionException {
- int device = r.nextInt(DEVICE_NUMBER);
- String sql = String.format("select s_1 from root.test.g_0.d_%s limit 1 offset 10", device);
- executeQuery(sql);
- }
-
- private static void queryAvgValueGroupBy5Min()
- throws IoTDBConnectionException, StatementExecutionException {
- int device = r.nextInt(DEVICE_NUMBER);
- String sql =
- String.format(
- "select avg(s_1) from root.test.g_0.d_%s GROUP BY ([now()-1d, now()), 5m)", device);
+ String sql = "select last(*) from root.test.**";
executeQuery(sql);
}
diff --git a/example/session/src/main/java/org/apache/iotdb/ReadTest.java b/example/session/src/main/java/org/apache/iotdb/ReadTest3.java
similarity index 87%
copy from example/session/src/main/java/org/apache/iotdb/ReadTest.java
copy to example/session/src/main/java/org/apache/iotdb/ReadTest3.java
index 90bed6b8cd..cb280588d2 100644
--- a/example/session/src/main/java/org/apache/iotdb/ReadTest.java
+++ b/example/session/src/main/java/org/apache/iotdb/ReadTest3.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,31 +31,27 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-public class ReadTest {
+public class ReadTest3 {
private static SessionPool sessionPool;
- private static final Logger LOGGER = LoggerFactory.getLogger(ReadTest.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReadTest3.class);
private static int THREAD_NUMBER = 100;
private static int DEVICE_NUMBER = 20000;
- private static int SENSOR_NUMBER = 500;
-
private static int READ_LOOP = 10000000;
private static long LOOP_INTERVAL_IN_NS = 3_000_000_000L;
-
- private static List<String> measurements;
-
- private static List<TSDataType> types;
-
- private static AtomicInteger totalRowNumber = new AtomicInteger();
-
private static Random r;
+ private static SyncReadSignal lastQuerySignal =
+ new SyncReadSignal(THREAD_NUMBER, "Last Value Query");
+ private static SyncReadSignal rawQuerySignal =
+ new SyncReadSignal(THREAD_NUMBER, "Raw Value Query");
+ private static SyncReadSignal avgQuerySignal =
+ new SyncReadSignal(THREAD_NUMBER, "AVG Query GROUP BY 5min");
/** Build a custom SessionPool for this example */
@@ -91,22 +86,20 @@ public class ReadTest {
}
protected void syncCountDownBeforeRead() {
- if (needResetLatch) {
- synchronized (this) {
- if (needResetLatch) {
- latch = new CountDownLatch(this.count);
- needResetLatch = false;
- totalCost = 0L;
- currentTimestamp = System.nanoTime();
- }
+ synchronized (this) {
+ if (needResetLatch) {
+ latch = new CountDownLatch(this.count);
+ needResetLatch = false;
+ totalCost = 0L;
+ currentTimestamp = System.nanoTime();
}
}
}
protected void finishReadAndWait(long cost, int loopIndex) throws InterruptedException {
CountDownLatch currentLatch = latch;
- totalCost += cost;
synchronized (this) {
+ totalCost += cost;
currentLatch.countDown();
if (currentLatch.getCount() == 0) {
needResetLatch = true;
@@ -135,7 +128,7 @@ public class ReadTest {
r = new Random();
// Run last query
- SyncReadSignal lastQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Last Value Query");
+
Thread[] lastReadThreads = new Thread[THREAD_NUMBER];
for (int i = 0; i < THREAD_NUMBER; i++) {
lastReadThreads[i] =
@@ -146,14 +139,15 @@ public class ReadTest {
throws IoTDBConnectionException, StatementExecutionException {
queryLastValue();
}
- });
+ },
+ "lastValueQuery-" + i);
}
for (Thread thread : lastReadThreads) {
thread.start();
}
// Run raw query
- SyncReadSignal rawQuerySignal = new SyncReadSignal(THREAD_NUMBER, "Raw Value Query");
+
Thread[] rawReadThreads = new Thread[THREAD_NUMBER];
for (int i = 0; i < THREAD_NUMBER; i++) {
rawReadThreads[i] =
@@ -164,14 +158,15 @@ public class ReadTest {
throws IoTDBConnectionException, StatementExecutionException {
queryRawValue();
}
- });
+ },
+ "rawValueQuery-" + i);
}
for (Thread thread : rawReadThreads) {
thread.start();
}
// Run avg query
- SyncReadSignal avgQuerySignal = new SyncReadSignal(THREAD_NUMBER, "AVG Query GROUP BY 5min");
+
Thread[] avgReadThreads = new Thread[THREAD_NUMBER];
for (int i = 0; i < THREAD_NUMBER; i++) {
avgReadThreads[i] =
@@ -182,7 +177,8 @@ public class ReadTest {
throws IoTDBConnectionException, StatementExecutionException {
queryAvgValueGroupBy5Min();
}
- });
+ },
+ "avgValueQuery-" + i);
}
for (Thread thread : avgReadThreads) {
thread.start();
@@ -227,20 +223,20 @@ public class ReadTest {
private static void queryLastValue()
throws IoTDBConnectionException, StatementExecutionException {
- int device = r.nextInt(DEVICE_NUMBER);
+ int device = r.nextInt(100);
String sql = "select last(s_1) from root.test.g_0.d_" + device;
executeQuery(sql);
}
private static void queryRawValue() throws IoTDBConnectionException, StatementExecutionException {
- int device = r.nextInt(DEVICE_NUMBER);
+ int device = r.nextInt(100);
String sql = String.format("select s_1 from root.test.g_0.d_%s limit 1 offset 10", device);
executeQuery(sql);
}
private static void queryAvgValueGroupBy5Min()
throws IoTDBConnectionException, StatementExecutionException {
- int device = r.nextInt(DEVICE_NUMBER);
+ int device = r.nextInt(100);
String sql =
String.format(
"select avg(s_1) from root.test.g_0.d_%s GROUP BY ([now()-1d, now()), 5m)", device);
diff --git a/example/session/src/main/java/org/apache/iotdb/WriteTest.java b/example/session/src/main/java/org/apache/iotdb/WriteTest.java
index 7dc9c5029d..3f928fe515 100644
--- a/example/session/src/main/java/org/apache/iotdb/WriteTest.java
+++ b/example/session/src/main/java/org/apache/iotdb/WriteTest.java
@@ -88,13 +88,11 @@ public class WriteTest {
}
protected void syncCountDownBeforeInsert() {
- if (needResetLatch) {
- synchronized (this) {
- if (needResetLatch) {
- latch = new CountDownLatch(this.count);
- needResetLatch = false;
- currentTimestamp = System.currentTimeMillis();
- }
+ synchronized (this) {
+ if (needResetLatch) {
+ latch = new CountDownLatch(this.count);
+ needResetLatch = false;
+ currentTimestamp = System.currentTimeMillis();
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index b29538feaf..907a1a491b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -321,7 +321,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
// The schema cache R/W and fetch operation must be locked together thus the cache clean
// operation executed by delete timeseries will be effective.
schemaCache.takeReadLock();
- templateSchemaCache.takeReadLock();
try {
List<ISchemaComputationWithAutoCreation> templateTimeSeriesRequestList = new ArrayList<>();
List<Pair<Template, PartialPath>> templateSetInfoList = new ArrayList<>();
@@ -349,7 +348,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
} finally {
schemaCache.releaseReadLock();
- templateSchemaCache.releaseReadLock();
}
}
@@ -357,7 +355,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
public void fetchAndComputeSchemaWithAutoCreateForFastWrite(
ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {
schemaCache.takeReadLock();
- templateSchemaCache.takeReadLock();
try {
Pair<Template, PartialPath> templateSetInfo =
@@ -389,7 +386,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
} finally {
schemaCache.releaseReadLock();
- templateSchemaCache.releaseReadLock();
}
}