You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/17 14:09:56 UTC

[iotdb] branch guonengtest updated: spotless and fix bug

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch guonengtest
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/guonengtest by this push:
     new 25206874e4 spotless and fix bug
25206874e4 is described below

commit 25206874e4b3c72962e0de4ec375437d3ba18ea5
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 17 22:09:42 2023 +0800

    spotless and fix bug
---
 .../java/org/apache/iotdb/SessionPoolExample.java  | 86 ++++++++++++----------
 1 file changed, 49 insertions(+), 37 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index e3a82c75ea..f348a20ee8 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb;
 
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.isession.SessionDataSet.DataIterator;
 import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -32,9 +31,10 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class SessionPoolExample {
 
@@ -65,9 +65,10 @@ public class SessionPoolExample {
   /** Build a redirect-able SessionPool for this example */
   private static void constructRedirectSessionPool() {
     List<String> nodeUrls = new ArrayList<>();
-    nodeUrls.add("10.24.58.58:6667");
-    nodeUrls.add("10.24.58.67:6667");
-    nodeUrls.add("10.24.58.69:6667");
+    //    nodeUrls.add("127.0.0.1:6667");
+    nodeUrls.add("192.168.130.16:6667");
+    nodeUrls.add("192.168.130.17:6667");
+    nodeUrls.add("192.168.130.18:6667");
     sessionPool =
         new SessionPool.Builder()
             .nodeUrls(nodeUrls)
@@ -79,17 +80,16 @@ public class SessionPoolExample {
   }
 
   private static class SyncWriteSignal {
-    protected volatile boolean latchInitialized = false;
+    protected volatile boolean needResetLatch = true;
     protected CountDownLatch latch;
     protected long currentTimestamp;
 
-
     protected void syncCountDownBeforeInsert() {
-      if (!latchInitialized) {
+      if (needResetLatch) {
         synchronized (this) {
-          if (!latchInitialized) {
+          if (needResetLatch) {
             latch = new CountDownLatch(THREAD_NUMBER);
-            latchInitialized = true;
+            needResetLatch = false;
             currentTimestamp = System.currentTimeMillis();
           }
         }
@@ -97,10 +97,16 @@ public class SessionPoolExample {
     }
 
     protected void finishInsert() {
-      latch.countDown();
-      if (latch.getCount() == 0) {
-        LOGGER.info("one loop finished. cost: {}ms. total rows: {}", (System.currentTimeMillis() - currentTimestamp), totalRowNumber.get());
+      if (latch.getCount() == 1) {
+        synchronized (this) {
+          needResetLatch = true;
+          LOGGER.info(
+              "one loop finished. cost: {}ms. total rows: {}",
+              (System.currentTimeMillis() - currentTimestamp),
+              totalRowNumber.get());
+        }
       }
+      latch.countDown();
     }
 
     protected void waitCurrentLoopFinished() throws InterruptedException {
@@ -122,11 +128,13 @@ public class SessionPoolExample {
       for (int j = 0; j < TOTAL_BATCH_COUNT_PER_DEVICE; j++) {
         signal.syncCountDownBeforeInsert();
         try {
-          insertRecords(index);
+          int insertDeviceCount = insertRecords(index);
+          totalRowNumber.addAndGet(insertDeviceCount);
+          signal.finishInsert();
+          signal.waitCurrentLoopFinished();
         } catch (Exception e) {
           LOGGER.error("insert error. Thread: {}. Error:", index, e);
         }
-        totalRowNumber.addAndGet(DEVICE_NUMBER / THREAD_NUMBER);
       }
     }
   }
@@ -145,26 +153,11 @@ public class SessionPoolExample {
     Thread[] threads = new Thread[THREAD_NUMBER];
 
     SyncWriteSignal signal = new SyncWriteSignal();
-    for (int i = 0 ; i < THREAD_NUMBER; i ++) {
+    for (int i = 0; i < THREAD_NUMBER; i++) {
       threads[i] = new Thread(new InsertWorker(signal, i));
     }
 
-    // count total execution time
-    long startTime = System.currentTimeMillis();
-    Runtime.getRuntime()
-        .addShutdownHook(
-            new Thread(
-                () -> {
-                  sessionPool.close();
-                  System.out.println(System.currentTimeMillis() - startTime);
-                }));
-
-    // start write thread
-    for (Thread thread : threads) {
-      thread.start();
-    }
-
-    r = new Random();
+    // initialize read thread
     Thread[] readThreads = new Thread[THREAD_NUMBER];
     for (int i = 0; i < THREAD_NUMBER; i++) {
       readThreads[i] =
@@ -180,11 +173,26 @@ public class SessionPoolExample {
               });
     }
 
-    // start read
-    for (Thread thread : readThreads) {
+    // count total execution time
+    r = new Random();
+    long startTime = System.currentTimeMillis();
+    Runtime.getRuntime()
+        .addShutdownHook(
+            new Thread(
+                () -> {
+                  sessionPool.close();
+                  System.out.println(System.currentTimeMillis() - startTime);
+                }));
+
+    // start write thread
+    for (Thread thread : threads) {
       thread.start();
     }
 
+    // start read
+    //    for (Thread thread : readThreads) {
+    //      thread.start();
+    //    }
 
     long startTime1 = System.nanoTime();
     new Thread(
@@ -205,15 +213,17 @@ public class SessionPoolExample {
   }
 
   // more insert example, see SessionExample.java
-  private static void insertRecords(int threadIndex) throws StatementExecutionException, IoTDBConnectionException {
+  private static int insertRecords(int threadIndex)
+      throws StatementExecutionException, IoTDBConnectionException {
     long time = insertTime.get();
     List<String> deviceIds = new ArrayList<>();
     List<Long> times = new ArrayList<>();
     List<List<String>> measurementsList = new ArrayList<>();
     List<List<TSDataType>> typesList = new ArrayList<>();
     List<List<Object>> valuesList = new ArrayList<>();
-    for (int j = 0; j < DEVICE_NUMBER / THREAD_NUMBER; j++) {
-      String deviceId = "root.test.g_0.d_" + (DEVICE_NUMBER / THREAD_NUMBER * threadIndex + j);
+    int deviceCount = 0;
+    for (int j = threadIndex; j < DEVICE_NUMBER; j += THREAD_NUMBER) {
+      String deviceId = "root.test.g_0.d_" + j;
       deviceIds.add(deviceId);
       times.add(time);
       List<Object> values = new ArrayList<>();
@@ -223,9 +233,11 @@ public class SessionPoolExample {
       valuesList.add(values);
       measurementsList.add(measurements);
       typesList.add(types);
+      deviceCount++;
     }
 
     sessionPool.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList);
+    return deviceCount;
   }
 
   private static void queryByIterator()