You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/17 14:09:56 UTC
[iotdb] branch guonengtest updated: spotless and fix bug
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch guonengtest
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/guonengtest by this push:
new 25206874e4 spotless and fix bug
25206874e4 is described below
commit 25206874e4b3c72962e0de4ec375437d3ba18ea5
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 17 22:09:42 2023 +0800
spotless and fix bug
---
.../java/org/apache/iotdb/SessionPoolExample.java | 86 ++++++++++++----------
1 file changed, 49 insertions(+), 37 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index e3a82c75ea..f348a20ee8 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.isession.SessionDataSet.DataIterator;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -32,9 +31,10 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
public class SessionPoolExample {
@@ -65,9 +65,10 @@ public class SessionPoolExample {
/** Build a redirect-able SessionPool for this example */
private static void constructRedirectSessionPool() {
List<String> nodeUrls = new ArrayList<>();
- nodeUrls.add("10.24.58.58:6667");
- nodeUrls.add("10.24.58.67:6667");
- nodeUrls.add("10.24.58.69:6667");
+ // nodeUrls.add("127.0.0.1:6667");
+ nodeUrls.add("192.168.130.16:6667");
+ nodeUrls.add("192.168.130.17:6667");
+ nodeUrls.add("192.168.130.18:6667");
sessionPool =
new SessionPool.Builder()
.nodeUrls(nodeUrls)
@@ -79,17 +80,16 @@ public class SessionPoolExample {
}
private static class SyncWriteSignal {
- protected volatile boolean latchInitialized = false;
+ protected volatile boolean needResetLatch = true;
protected CountDownLatch latch;
protected long currentTimestamp;
-
protected void syncCountDownBeforeInsert() {
- if (!latchInitialized) {
+ if (needResetLatch) {
synchronized (this) {
- if (!latchInitialized) {
+ if (needResetLatch) {
latch = new CountDownLatch(THREAD_NUMBER);
- latchInitialized = true;
+ needResetLatch = false;
currentTimestamp = System.currentTimeMillis();
}
}
@@ -97,10 +97,16 @@ public class SessionPoolExample {
}
protected void finishInsert() {
- latch.countDown();
- if (latch.getCount() == 0) {
- LOGGER.info("one loop finished. cost: {}ms. total rows: {}", (System.currentTimeMillis() - currentTimestamp), totalRowNumber.get());
+ if (latch.getCount() == 1) {
+ synchronized (this) {
+ needResetLatch = true;
+ LOGGER.info(
+ "one loop finished. cost: {}ms. total rows: {}",
+ (System.currentTimeMillis() - currentTimestamp),
+ totalRowNumber.get());
+ }
}
+ latch.countDown();
}
protected void waitCurrentLoopFinished() throws InterruptedException {
@@ -122,11 +128,13 @@ public class SessionPoolExample {
for (int j = 0; j < TOTAL_BATCH_COUNT_PER_DEVICE; j++) {
signal.syncCountDownBeforeInsert();
try {
- insertRecords(index);
+ int insertDeviceCount = insertRecords(index);
+ totalRowNumber.addAndGet(insertDeviceCount);
+ signal.finishInsert();
+ signal.waitCurrentLoopFinished();
} catch (Exception e) {
LOGGER.error("insert error. Thread: {}. Error:", index, e);
}
- totalRowNumber.addAndGet(DEVICE_NUMBER / THREAD_NUMBER);
}
}
}
@@ -145,26 +153,11 @@ public class SessionPoolExample {
Thread[] threads = new Thread[THREAD_NUMBER];
SyncWriteSignal signal = new SyncWriteSignal();
- for (int i = 0 ; i < THREAD_NUMBER; i ++) {
+ for (int i = 0; i < THREAD_NUMBER; i++) {
threads[i] = new Thread(new InsertWorker(signal, i));
}
- // count total execution time
- long startTime = System.currentTimeMillis();
- Runtime.getRuntime()
- .addShutdownHook(
- new Thread(
- () -> {
- sessionPool.close();
- System.out.println(System.currentTimeMillis() - startTime);
- }));
-
- // start write thread
- for (Thread thread : threads) {
- thread.start();
- }
-
- r = new Random();
+ // initialize read thread
Thread[] readThreads = new Thread[THREAD_NUMBER];
for (int i = 0; i < THREAD_NUMBER; i++) {
readThreads[i] =
@@ -180,11 +173,26 @@ public class SessionPoolExample {
});
}
- // start read
- for (Thread thread : readThreads) {
+ // count total execution time
+ r = new Random();
+ long startTime = System.currentTimeMillis();
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ sessionPool.close();
+ System.out.println(System.currentTimeMillis() - startTime);
+ }));
+
+ // start write thread
+ for (Thread thread : threads) {
thread.start();
}
+ // start read
+ // for (Thread thread : readThreads) {
+ // thread.start();
+ // }
long startTime1 = System.nanoTime();
new Thread(
@@ -205,15 +213,17 @@ public class SessionPoolExample {
}
// more insert example, see SessionExample.java
- private static void insertRecords(int threadIndex) throws StatementExecutionException, IoTDBConnectionException {
+ private static int insertRecords(int threadIndex)
+ throws StatementExecutionException, IoTDBConnectionException {
long time = insertTime.get();
List<String> deviceIds = new ArrayList<>();
List<Long> times = new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
List<List<TSDataType>> typesList = new ArrayList<>();
List<List<Object>> valuesList = new ArrayList<>();
- for (int j = 0; j < DEVICE_NUMBER / THREAD_NUMBER; j++) {
- String deviceId = "root.test.g_0.d_" + (DEVICE_NUMBER / THREAD_NUMBER * threadIndex + j);
+ int deviceCount = 0;
+ for (int j = threadIndex; j < DEVICE_NUMBER; j += THREAD_NUMBER) {
+ String deviceId = "root.test.g_0.d_" + j;
deviceIds.add(deviceId);
times.add(time);
List<Object> values = new ArrayList<>();
@@ -223,9 +233,11 @@ public class SessionPoolExample {
valuesList.add(values);
measurementsList.add(measurements);
typesList.add(types);
+ deviceCount++;
}
sessionPool.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList);
+ return deviceCount;
}
private static void queryByIterator()