You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/01/21 00:39:18 UTC
[iotdb] branch master updated: add executeRawDataQuery in
SessionPool (#2522)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 58dfb2f add executeRawDataQuery in SessionPool (#2522)
58dfb2f is described below
commit 58dfb2f092dad46d971c2211b0dc7501b84dac64
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Thu Jan 21 08:38:51 2021 +0800
add executeRawDataQuery in SessionPool (#2522)
---
.../org/apache/iotdb/session/pool/SessionPool.java | 22 +++++++++++++++
.../apache/iotdb/session/pool/SessionPoolTest.java | 33 +++++++++++++++++++++-
2 files changed, 54 insertions(+), 1 deletion(-)
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 9e9e5fd..5d86dc0 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -1012,4 +1012,26 @@ public class SessionPool {
}
}
}
+
+ public SessionDataSetWrapper executeRawDataQuery(List<String> paths, long startTime, long endTime)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ SessionDataSet resp = session.executeRawDataQuery(paths, startTime, endTime);
+ SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+ occupy(session);
+ return wrapper;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ logger.warn("executeRawDataQuery failed", e);
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ // never go here
+ return null;
+ }
}
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index c426e90..db784c1 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -163,6 +165,35 @@ public class SessionPoolTest {
}
@Test
+ public void executeRawDataQuery() throws InterruptedException {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ write10Data(pool, true);
+ List<String> pathList = new ArrayList<>();
+ pathList.add("root.sg1.d1.s1");
+ for (int i = 0; i < 10; i++) {
+ final int no = i;
+ service.submit(() -> {
+ try {
+ SessionDataSetWrapper wrapper = pool.executeRawDataQuery(pathList, no, no + 1);
+ if (wrapper.hasNext()) {
+ Assert.assertEquals(no, wrapper.sessionDataSet.next().getTimestamp());
+ }
+ pool.closeResultSet(wrapper);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ });
+ }
+ service.shutdown();
+ assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+ assertTrue(pool.currentAvailableSize() <= 3);
+ assertEquals(0, pool.currentOccupiedSize());
+ pool.close();
+ }
+
+ @Test
public void tryIfTheServerIsRestart() {
SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null);
write10Data(pool, true);
@@ -243,7 +274,7 @@ public class SessionPoolTest {
SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null);
pool.close();
try {
- pool.insertRecord("root.sg1.d1", 1, Collections.singletonList("s1" ),
+ pool.insertRecord("root.sg1.d1", 1, Collections.singletonList("s1"),
Collections.singletonList(TSDataType.INT64),
Collections.singletonList(1L));
} catch (IoTDBConnectionException e) {