You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/01/21 00:39:18 UTC

[iotdb] branch master updated: add executeRawDataQuery in SessionPool (#2522)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 58dfb2f  add executeRawDataQuery in SessionPool (#2522)
58dfb2f is described below

commit 58dfb2f092dad46d971c2211b0dc7501b84dac64
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Thu Jan 21 08:38:51 2021 +0800

    add executeRawDataQuery in SessionPool (#2522)
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 22 +++++++++++++++
 .../apache/iotdb/session/pool/SessionPoolTest.java | 33 +++++++++++++++++++++-
 2 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 9e9e5fd..5d86dc0 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -1012,4 +1012,26 @@ public class SessionPool {
       }
     }
   }
+
+  public SessionDataSetWrapper executeRawDataQuery(List<String> paths, long startTime, long endTime)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        SessionDataSet resp = session.executeRawDataQuery(paths, startTime, endTime);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeRawDataQuery failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    // never go here
+    return null;
+  }
 }
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index c426e90..db784c1 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -163,6 +165,35 @@ public class SessionPoolTest {
   }
 
   @Test
+  public void executeRawDataQuery() throws InterruptedException {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    write10Data(pool, true);
+    List<String> pathList = new ArrayList<>();
+    pathList.add("root.sg1.d1.s1");
+    for (int i = 0; i < 10; i++) {
+      final int no = i;
+      service.submit(() -> {
+        try {
+          SessionDataSetWrapper wrapper = pool.executeRawDataQuery(pathList, no, no + 1);
+          if (wrapper.hasNext()) {
+            Assert.assertEquals(no, wrapper.sessionDataSet.next().getTimestamp());
+          }
+          pool.closeResultSet(wrapper);
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail();
+        }
+      });
+    }
+    service.shutdown();
+    assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+    assertTrue(pool.currentAvailableSize() <= 3);
+    assertEquals(0, pool.currentOccupiedSize());
+    pool.close();
+  }
+
+  @Test
   public void tryIfTheServerIsRestart() {
     SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null);
     write10Data(pool, true);
@@ -243,7 +274,7 @@ public class SessionPoolTest {
     SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null);
     pool.close();
     try {
-      pool.insertRecord("root.sg1.d1", 1, Collections.singletonList("s1" ),
+      pool.insertRecord("root.sg1.d1", 1, Collections.singletonList("s1"),
           Collections.singletonList(TSDataType.INT64),
           Collections.singletonList(1L));
     } catch (IoTDBConnectionException e) {