You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/25 11:09:22 UTC
[incubator-iotdb] branch cluster_read updated: add large data ut
and add query timeout mechanism
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_read by this push:
new d5d3607 add large data ut and add query timeout mechanism
d5d3607 is described below
commit d5d36071eea921131361cf4770fc6bf4f20ec955
Author: lta <li...@163.com>
AuthorDate: Thu Apr 25 19:09:03 2019 +0800
add large data ut and add query timeout mechanism
---
.../iotdb/cluster/config/ClusterConstant.java | 7 +
.../querynode/ClusterLocalQueryManager.java | 29 +-
.../querynode/ClusterLocalSingleQueryManager.java | 16 +-
.../querynode/IClusterLocalSingleQueryManager.java | 5 +
.../cluster/query/ClusterQueryLargeDataTest.java | 507 +++++++++++++++++++++
5 files changed, 560 insertions(+), 4 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index 7ab6f67..839f7a4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@ -46,4 +46,11 @@ public class ClusterConstant {
*/
public static final int MAX_CACHE_BATCH_DATA_LIST_SIZE = 2;
+ /**
+ * Query timeout in query node. If time interval between last communications with coordinator node
+ * and now exceed this parameter, release corresponding query resource.Each query in query node
+ * has a <code>QueryRepeaterTimer</code>, the unit is milliseconds. Default value is 30 minutes.
+ */
+ public static final int QUERY_TIMEOUT_IN_QUERY_NODE = 30 * 60 * 1000;
+
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index 9cce813..2032424 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.cluster.query.manager.querynode;
import com.alipay.sofa.jraft.util.OnlyForTest;
+import com.alipay.sofa.jraft.util.RepeatedTimer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
@@ -34,9 +36,13 @@ import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClusterLocalQueryManager.class);
+
/**
* Key is task id which is assigned by coordinator node, value is job id which is assigned by
* query node(local).
@@ -48,7 +54,6 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
*/
private static final ConcurrentHashMap<Long, ClusterLocalSingleQueryManager> SINGLE_QUERY_MANAGER_MAP = new ConcurrentHashMap<>();
-
private ClusterLocalQueryManager() {
}
@@ -58,7 +63,9 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
long jobId = QueryResourceManager.getInstance().assignJobId();
String taskId = request.getTaskId();
TASK_ID_MAP_JOB_ID.put(taskId, jobId);
- ClusterLocalSingleQueryManager localQueryManager = new ClusterLocalSingleQueryManager(jobId);
+ ClusterLocalSingleQueryManager localQueryManager = new ClusterLocalSingleQueryManager(jobId,
+ new QueryRepeaterTimer(taskId,
+ ClusterConstant.QUERY_TIMEOUT_IN_QUERY_NODE));
SINGLE_QUERY_MANAGER_MAP.put(jobId, localQueryManager);
return localQueryManager.createSeriesReader(request);
}
@@ -124,4 +131,22 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
return SINGLE_QUERY_MANAGER_MAP;
}
+ public class QueryRepeaterTimer extends RepeatedTimer {
+
+ private String taskId;
+
+ public QueryRepeaterTimer(String taskId, int timeoutMs) {
+ super(taskId, timeoutMs);
+ this.taskId = taskId;
+ }
+
+ @Override
+ protected void onTrigger() {
+ try {
+ close(taskId);
+ } catch (FileNodeManagerException e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index abe0bd7..60877fb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.iotdb.cluster.query.PathType;
import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
+import org.apache.iotdb.cluster.query.manager.querynode.ClusterLocalQueryManager.QueryRepeaterTimer;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterBatchReaderByTimestamp;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterBatchReaderWithoutTimeGenerator;
import org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader;
@@ -65,6 +66,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
private String groupId;
/**
+ * Timer of Query, if the time is up, close query resource.
+ */
+ private QueryRepeaterTimer queryRepeaterTimer;
+
+ /**
* Job id assigned by local QueryResourceManager
*/
private long jobId;
@@ -99,8 +105,9 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
/**
* Constructor of ClusterLocalSingleQueryManager
*/
- public ClusterLocalSingleQueryManager(long jobId) {
+ public ClusterLocalSingleQueryManager(long jobId, QueryRepeaterTimer queryRepeaterTimer) {
this.jobId = jobId;
+ this.queryRepeaterTimer = queryRepeaterTimer;
}
@Override
@@ -230,7 +237,6 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
public QuerySeriesDataByTimestampResponse readBatchDataByTimestamp(
QuerySeriesDataByTimestampRequest request)
throws IOException {
- String groupId = request.getGroupID();
QuerySeriesDataByTimestampResponse response = new QuerySeriesDataByTimestampResponse(groupId);
List<String> fetchDataSeries = request.getFetchDataSeries();
long targetQueryRounds = request.getQueryRounds();
@@ -247,6 +253,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
return response;
}
+ @Override
+ public void resetQueryTimer() {
+ queryRepeaterTimer.reset();
+ }
+
/**
* Read batch data of select series
*
@@ -275,6 +286,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
@Override
public void close() throws FileNodeManagerException {
+ queryRepeaterTimer.destroy();
QueryResourceManager.getInstance().endQueryForGivenJob(jobId);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java
index e462f91..318772f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java
@@ -63,6 +63,11 @@ public interface IClusterLocalSingleQueryManager {
QuerySeriesDataByTimestampRequest request) throws IOException;
/**
+ * Reset query timer and restart timer
+ */
+ void resetQueryTimer();
+
+ /**
* Release query resource
*/
void close() throws FileNodeManagerException;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
new file mode 100644
index 0000000..223f0dc
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
@@ -0,0 +1,507 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query;
+
+import static org.apache.iotdb.cluster.utils.Utils.insertData;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.utils.EnvironmentUtils;
+import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBResultMetadata;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClusterQueryLargeDataTest {
+
+
+ private Server server;
+ private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
+ private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
+ CLUSTER_CONFIG.getPort());
+
+ private static final String URL = "127.0.0.1:6667/";
+
+ private static final String[] createSQLs1 = {
+ "SET STORAGE GROUP TO root.vehicle",
+ "SET STORAGE GROUP TO root.test",
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.vehicle.d1.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d1.s3 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.test.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.test.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.test.d1.g0.s0 WITH DATATYPE=INT32, ENCODING=RLE"
+ };
+ private static final String[] insertSQLs1 = {
+ "insert into root.vehicle.d0(timestamp,s0) values(10,100)",
+ "insert into root.vehicle.d0(timestamp,s0,s1) values(12,101,'102')",
+ "insert into root.vehicle.d0(timestamp,s1) values(19,'103')",
+ "insert into root.vehicle.d1(timestamp,s2) values(11,104.0)",
+ "insert into root.vehicle.d1(timestamp,s2,s3) values(15,105.0,true)",
+ "insert into root.vehicle.d1(timestamp,s3) values(17,false)",
+ "insert into root.vehicle.d0(timestamp,s0) values(20,1000)",
+ "insert into root.vehicle.d0(timestamp,s0,s1) values(22,1001,'1002')",
+ "insert into root.vehicle.d0(timestamp,s1) values(29,'1003')",
+ "insert into root.vehicle.d1(timestamp,s2) values(21,1004.0)",
+ "insert into root.vehicle.d1(timestamp,s2,s3) values(25,1005.0,true)",
+ "insert into root.vehicle.d1(timestamp,s3) values(27,true)",
+ "insert into root.test.d0(timestamp,s0) values(10,106)",
+ "insert into root.test.d0(timestamp,s0,s1) values(14,107,'108')",
+ "insert into root.test.d0(timestamp,s1) values(16,'109')",
+ "insert into root.test.d1.g0(timestamp,s0) values(1,110)",
+ "insert into root.test.d0(timestamp,s0) values(30,1006)",
+ "insert into root.test.d0(timestamp,s0,s1) values(34,1007,'1008')",
+ "insert into root.test.d0(timestamp,s1) values(36,'1090')",
+ "insert into root.test.d1.g0(timestamp,s0) values(10,1100)"};
+ private static final String[] insertSqls2 = {
+ "insert into root.vehicle.d0(timestamp,s0) values(6,120)",
+ "insert into root.vehicle.d0(timestamp,s0,s1) values(38,121,'122')",
+ "insert into root.vehicle.d0(timestamp,s1) values(9,'123')",
+ "insert into root.vehicle.d0(timestamp,s0) values(16,128)",
+ "insert into root.vehicle.d0(timestamp,s0,s1) values(18,189,'198')",
+ "insert into root.vehicle.d0(timestamp,s1) values(99,'1234')",
+ "insert into root.vehicle.d1(timestamp,s2) values(14,1024.0)",
+ "insert into root.vehicle.d1(timestamp,s2,s3) values(29,1205.0,true)",
+ "insert into root.vehicle.d1(timestamp,s3) values(33,true)",
+ "insert into root.test.d0(timestamp,s0) values(15,126)",
+ "insert into root.test.d0(timestamp,s0,s1) values(8,127,'128')",
+ "insert into root.test.d0(timestamp,s1) values(20,'129')",
+ "insert into root.test.d1.g0(timestamp,s0) values(14,430)",
+ "insert into root.test.d0(timestamp,s0) values(150,426)",
+ "insert into root.test.d0(timestamp,s0,s1) values(80,427,'528')",
+ "insert into root.test.d0(timestamp,s1) values(2,'1209')",
+ "insert into root.test.d1.g0(timestamp,s0) values(4,330)"};
+ private static final String[] createSQLs3 = {
+ "SET STORAGE GROUP TO root.iotdb",
+ "SET STORAGE GROUP TO root.cluster",
+ "CREATE TIMESERIES root.iotdb.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.iotdb.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.iotdb.d1.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+ "CREATE TIMESERIES root.iotdb.d1.s3 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.cluster.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.cluster.d0.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.cluster.d1.g0.s0 WITH DATATYPE=INT32, ENCODING=RLE"
+ };
+ private static final String[] insertSQLs3 = {
+ "DELETE FROM root.vehicle WHERE time < 20",
+ "DELETE FROM root.test WHERE time < 20",
+ "insert into root.iotdb.d0(timestamp,s0) values(3,100)",
+ "insert into root.iotdb.d0(timestamp,s0,s1) values(22,101,'102')",
+ "insert into root.iotdb.d0(timestamp,s1) values(24,'103')",
+ "insert into root.iotdb.d1(timestamp,s2) values(21,104.0)",
+ "insert into root.iotdb.d1(timestamp,s2,s3) values(25,105.0,true)",
+ "insert into root.iotdb.d1(timestamp,s3) values(27,false)",
+ "insert into root.iotdb.d0(timestamp,s0) values(30,1000)",
+ "insert into root.iotdb.d0(timestamp,s0,s1) values(202,101,'102')",
+ "insert into root.iotdb.d0(timestamp,s1) values(44,'103')",
+ "insert into root.iotdb.d1(timestamp,s2) values(1,404.0)",
+ "insert into root.iotdb.d1(timestamp,s2,s3) values(250,10.0,true)",
+ "insert into root.iotdb.d1(timestamp,s3) values(207,false)",
+ "insert into root.cluster.d0(timestamp,s0) values(20,106)",
+ "insert into root.cluster.d0(timestamp,s0,s1) values(14,107,'108')",
+ "insert into root.cluster.d1.g0(timestamp,s0) values(1,110)",
+ "insert into root.cluster.d0(timestamp,s0) values(200,1006)",
+ "insert into root.cluster.d0(timestamp,s0,s1) values(1004,1007,'1080')",
+ "insert into root.cluster.d1.g0(timestamp,s0) values(1000,910)",
+ "insert into root.vehicle.d0(timestamp,s0) values(209,130)",
+ "insert into root.vehicle.d0(timestamp,s0,s1) values(206,131,'132')",
+ "insert into root.vehicle.d0(timestamp,s1) values(70,'33')",
+ "insert into root.vehicle.d1(timestamp,s2) values(204,14.0)",
+ "insert into root.vehicle.d1(timestamp,s2,s3) values(29,135.0,false)",
+ "insert into root.vehicle.d1(timestamp,s3) values(14,false)",
+ "insert into root.test.d0(timestamp,s0) values(19,136)",
+ "insert into root.test.d0(timestamp,s0,s1) values(7,137,'138')",
+ "insert into root.test.d0(timestamp,s1) values(30,'139')",
+ "insert into root.test.d1.g0(timestamp,s0) values(4,150)",
+ "insert into root.test.d0(timestamp,s0) values(1900,1316)",
+ "insert into root.test.d0(timestamp,s0,s1) values(700,1307,'1038')",
+ "insert into root.test.d0(timestamp,s1) values(3000,'1309')",
+ "insert into root.test.d1.g0(timestamp,s0) values(400,1050)"
+ };
+
+ private static final String[] querys1 ={
+ "select * from root.vehicle",
+ "select * from root.test",
+ "select * from root.vehicle,root.test where time = 11 or time = 12",
+ "select * from root.vehicle,root.test where d0.s0 > 10 and d0.s1 < 301 or time = 12",
+ "select * from root"
+ };
+ private static final String[] querys2 ={
+ "select * from root.vehicle",
+ "select * from root.test",
+ "select * from root.vehicle,root.test where time = 11 or time = 16",
+ "select * from root.vehicle,root.test where d0.s0 > 10 and d0.s1 < 301 or time = 20",
+ "select * from root"
+ };
+ private static final String[] querys3 ={
+ "select * from root.vehicle",
+ "select * from root.test",
+ "select * from root.cluster",
+ "select * from root.vehicle,root.test where time = 11 or time = 14",
+ "select * from root.vehicle,root.test where d0.s0 > 0 and d0.s1 < 1001 or time = 14",
+ "select * from root"
+ };
+
+ private Map<Integer, List<String>> queryCorrentResults = new HashMap<>();
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.closeMemControl();
+ CLUSTER_CONFIG.createAllPath();
+ server = Server.getInstance();
+ server.start();
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ server.stop();
+ EnvironmentUtils.cleanEnv();
+ }
+
+ private void initCorrectResults1(){
+ queryCorrentResults.clear();
+ queryCorrentResults.put(0, new ArrayList<>());
+ queryCorrentResults.put(1, new ArrayList<>());
+ queryCorrentResults.put(2, new ArrayList<>());
+ queryCorrentResults.put(3, new ArrayList<>());
+ queryCorrentResults.put(4, new ArrayList<>());
+ List<String> firstQueryRes = queryCorrentResults.get(0);
+ firstQueryRes.add("10,100,null,null,null");
+ firstQueryRes.add("11,null,null,104.0,null");
+ firstQueryRes.add("12,101,102,null,null");
+ firstQueryRes.add("15,null,null,105.0,true");
+ firstQueryRes.add("17,null,null,null,false");
+ firstQueryRes.add("19,null,103,null,null");
+ firstQueryRes.add("20,1000,null,null,null");
+ firstQueryRes.add("21,null,null,1004.0,null");
+ firstQueryRes.add("22,1001,1002,null,null");
+ firstQueryRes.add("25,null,null,1005.0,true");
+ firstQueryRes.add("27,null,null,null,true");
+ firstQueryRes.add("29,null,1003,null,null");
+
+ List<String> secondQueryRes = queryCorrentResults.get(1);
+ secondQueryRes.add("1,null,null,110");
+ secondQueryRes.add("10,106,null,1100");
+ secondQueryRes.add("14,107,108,null");
+ secondQueryRes.add("16,null,109,null");
+ secondQueryRes.add("30,1006,null,null");
+ secondQueryRes.add("34,1007,1008,null");
+ secondQueryRes.add("36,null,1090,null");
+
+ List<String> thirdQueryRes = queryCorrentResults.get(2);
+ thirdQueryRes.add("11,null,null,104.0,null,null,null,null");
+ thirdQueryRes.add("12,101,102,null,null,null,null,null");
+
+ List<String> forthQueryRes = queryCorrentResults.get(3);
+ forthQueryRes.add("12,101,102,null,null,null,null,null");
+
+ List<String> fifthQueryRes = queryCorrentResults.get(4);
+ fifthQueryRes.add("1,null,null,null,null,null,null,110");
+ fifthQueryRes.add("10,100,null,null,null,106,null,1100");
+ fifthQueryRes.add("11,null,null,104.0,null,null,null,null");
+ fifthQueryRes.add("12,101,102,null,null,null,null,null");
+ fifthQueryRes.add("14,null,null,null,null,107,108,null");
+ fifthQueryRes.add("15,null,null,105.0,true,null,null,null");
+ fifthQueryRes.add("16,null,null,null,null,null,109,null");
+ fifthQueryRes.add("17,null,null,null,false,null,null,null");
+ fifthQueryRes.add("19,null,103,null,null,null,null,null");
+ fifthQueryRes.add("20,1000,null,null,null,null,null,null");
+ fifthQueryRes.add("21,null,null,1004.0,null,null,null,null");
+ fifthQueryRes.add("22,1001,1002,null,null,null,null,null");
+ fifthQueryRes.add("25,null,null,1005.0,true,null,null,null");
+ fifthQueryRes.add("27,null,null,null,true,null,null,null");
+ fifthQueryRes.add("29,null,1003,null,null,null,null,null");
+ fifthQueryRes.add("30,null,null,null,null,1006,null,null");
+ fifthQueryRes.add("34,null,null,null,null,1007,1008,null");
+ fifthQueryRes.add("36,null,null,null,null,null,1090,null");
+ }
+
+ private void initCorrectResults2(){
+ queryCorrentResults.clear();
+ queryCorrentResults.put(0, new ArrayList<>());
+ queryCorrentResults.put(1, new ArrayList<>());
+ queryCorrentResults.put(2, new ArrayList<>());
+ queryCorrentResults.put(3, new ArrayList<>());
+ queryCorrentResults.put(4, new ArrayList<>());
+ List<String> firstQueryRes = queryCorrentResults.get(0);
+ firstQueryRes.add("6,120,null,null,null");
+ firstQueryRes.add("9,null,123,null,null");
+ firstQueryRes.add("10,100,null,null,null");
+ firstQueryRes.add("11,null,null,104.0,null");
+ firstQueryRes.add("12,101,102,null,null");
+ firstQueryRes.add("14,null,null,1024.0,null");
+ firstQueryRes.add("15,null,null,105.0,true");
+ firstQueryRes.add("16,128,null,null,null");
+ firstQueryRes.add("17,null,null,null,false");
+ firstQueryRes.add("18,189,198,null,null");
+ firstQueryRes.add("19,null,103,null,null");
+ firstQueryRes.add("20,1000,null,null,null");
+ firstQueryRes.add("21,null,null,1004.0,null");
+ firstQueryRes.add("22,1001,1002,null,null");
+ firstQueryRes.add("25,null,null,1005.0,true");
+ firstQueryRes.add("27,null,null,null,true");
+ firstQueryRes.add("29,null,1003,1205.0,true");
+ firstQueryRes.add("33,null,null,null,true");
+ firstQueryRes.add("38,121,122,null,null");
+ firstQueryRes.add("99,null,1234,null,null");
+
+ List<String> secondQueryRes = queryCorrentResults.get(1);
+ secondQueryRes.add("1,null,null,110");
+ secondQueryRes.add("2,null,1209,null");
+ secondQueryRes.add("4,null,null,330");
+ secondQueryRes.add("8,127,128,null");
+ secondQueryRes.add("10,106,null,1100");
+ secondQueryRes.add("14,107,108,430");
+ secondQueryRes.add("15,126,null,null");
+ secondQueryRes.add("16,null,109,null");
+ secondQueryRes.add("20,null,129,null");
+ secondQueryRes.add("30,1006,null,null");
+ secondQueryRes.add("34,1007,1008,null");
+ secondQueryRes.add("36,null,1090,null");
+ secondQueryRes.add("80,427,528,null");
+ secondQueryRes.add("150,426,null,null");
+
+ List<String> thirdQueryRes = queryCorrentResults.get(2);
+ thirdQueryRes.add("11,null,null,104.0,null,null,null,null");
+ thirdQueryRes.add("16,128,null,null,null,null,109,null");
+
+ List<String> forthQueryRes = queryCorrentResults.get(3);
+ forthQueryRes.add("20,1000,null,null,null,null,129,null");
+
+ List<String> fifthQueryRes = queryCorrentResults.get(4);
+ fifthQueryRes.add("1,null,null,null,null,null,null,110");
+ fifthQueryRes.add("2,null,null,null,null,null,1209,null");
+ fifthQueryRes.add("4,null,null,null,null,null,null,330");
+ fifthQueryRes.add("6,120,null,null,null,null,null,null");
+ fifthQueryRes.add("8,null,null,null,null,127,128,null");
+ fifthQueryRes.add("9,null,123,null,null,null,null,null");
+ fifthQueryRes.add("10,100,null,null,null,106,null,1100");
+ fifthQueryRes.add("11,null,null,104.0,null,null,null,null");
+ fifthQueryRes.add("12,101,102,null,null,null,null,null");
+ fifthQueryRes.add("14,null,null,1024.0,null,107,108,430");
+ fifthQueryRes.add("15,null,null,105.0,true,126,null,null");
+ fifthQueryRes.add("16,128,null,null,null,null,109,null");
+ fifthQueryRes.add("17,null,null,null,false,null,null,null");
+ fifthQueryRes.add("18,189,198,null,null,null,null,null");
+ fifthQueryRes.add("19,null,103,null,null,null,null,null");
+ fifthQueryRes.add("20,1000,null,null,null,null,129,null");
+ fifthQueryRes.add("21,null,null,1004.0,null,null,null,null");
+ fifthQueryRes.add("22,1001,1002,null,null,null,null,null");
+ fifthQueryRes.add("25,null,null,1005.0,true,null,null,null");
+ fifthQueryRes.add("27,null,null,null,true,null,null,null");
+ fifthQueryRes.add("29,null,1003,1205.0,true,null,null,null");
+ fifthQueryRes.add("30,null,null,null,null,1006,null,null");
+ fifthQueryRes.add("33,null,null,null,true,null,null,null");
+ fifthQueryRes.add("34,null,null,null,null,1007,1008,null");
+ fifthQueryRes.add("36,null,null,null,null,null,1090,null");
+ fifthQueryRes.add("38,121,122,null,null,null,null,null");
+ fifthQueryRes.add("80,null,null,null,null,427,528,null");
+ fifthQueryRes.add("99,null,1234,null,null,null,null,null");
+ fifthQueryRes.add("150,null,null,null,null,426,null,null");
+ }
+
+ private void initCorrectResults3(){
+ queryCorrentResults.clear();
+ queryCorrentResults.put(0, new ArrayList<>());
+ queryCorrentResults.put(1, new ArrayList<>());
+ queryCorrentResults.put(2, new ArrayList<>());
+ queryCorrentResults.put(3, new ArrayList<>());
+ queryCorrentResults.put(4, new ArrayList<>());
+ queryCorrentResults.put(5, new ArrayList<>());
+ List<String> zeroQueryRes = queryCorrentResults.get(0);
+ zeroQueryRes.add("14,null,null,null,false");
+ zeroQueryRes.add("20,1000,null,null,null");
+ zeroQueryRes.add("21,null,null,1004.0,null");
+ zeroQueryRes.add("22,1001,1002,null,null");
+ zeroQueryRes.add("25,null,null,1005.0,true");
+ zeroQueryRes.add("27,null,null,null,true");
+ zeroQueryRes.add("29,null,1003,135.0,false");
+ zeroQueryRes.add("33,null,null,null,true");
+ zeroQueryRes.add("38,121,122,null,null");
+ zeroQueryRes.add("70,null,33,null,null");
+ zeroQueryRes.add("99,null,1234,null,null");
+ zeroQueryRes.add("204,null,null,14.0,null");
+ zeroQueryRes.add("206,131,132,null,null");
+ zeroQueryRes.add("209,130,null,null,null");
+
+ List<String> firstQueryRes = queryCorrentResults.get(1);
+ firstQueryRes.add("4,null,null,150");
+ firstQueryRes.add("7,137,138,null");
+ firstQueryRes.add("19,136,null,null");
+ firstQueryRes.add("20,null,129,null");
+ firstQueryRes.add("30,1006,139,null");
+ firstQueryRes.add("34,1007,1008,null");
+ firstQueryRes.add("36,null,1090,null");
+ firstQueryRes.add("80,427,528,null");
+ firstQueryRes.add("150,426,null,null");
+ firstQueryRes.add("400,null,null,1050");
+ firstQueryRes.add("700,1307,1038,null");
+ firstQueryRes.add("1900,1316,null,null");
+ firstQueryRes.add("3000,null,1309,null");
+
+ List<String> secondQueryRes = queryCorrentResults.get(2);
+ secondQueryRes.add("1,null,null,110");
+ secondQueryRes.add("14,107,108,null");
+ secondQueryRes.add("20,106,null,null");
+ secondQueryRes.add("200,1006,null,null");
+ secondQueryRes.add("1000,null,null,910");
+ secondQueryRes.add("1004,1007,1080,null");
+
+ List<String> thirdQueryRes = queryCorrentResults.get(3);
+ thirdQueryRes.add("14,null,null,null,false,null,null,null");
+
+ List<String> forthQueryRes = queryCorrentResults.get(4);
+ forthQueryRes.add("14,null,null,null,false,null,null,null");
+
+ List<String> fifthQueryRes = queryCorrentResults.get(5);
+ fifthQueryRes.add("1,null,null,404.0,null,null,null,null,null,null,null,null,null,null,110");
+ fifthQueryRes.add("3,100,null,null,null,null,null,null,null,null,null,null,null,null,null");
+ fifthQueryRes.add("4,null,null,null,null,null,null,null,null,null,null,150,null,null,null");
+ fifthQueryRes.add("7,null,null,null,null,null,null,null,null,137,138,null,null,null,null");
+ fifthQueryRes.add("14,null,null,null,null,null,null,null,false,null,null,null,107,108,null");
+ fifthQueryRes.add("19,null,null,null,null,null,null,null,null,136,null,null,null,null,null");
+ fifthQueryRes.add("20,null,null,null,null,1000,null,null,null,null,129,null,106,null,null");
+ fifthQueryRes.add("21,null,null,104.0,null,null,null,1004.0,null,null,null,null,null,null,null");
+ fifthQueryRes.add("22,101,102,null,null,1001,1002,null,null,null,null,null,null,null,null");
+ fifthQueryRes.add("24,null,103,null,null,null,null,null,null,null,null,null,null,null,null");
+ fifthQueryRes.add("25,null,null,105.0,true,null,null,1005.0,true,null,null,null,null,null,null");
+ fifthQueryRes.add("27,null,null,null,false,null,null,null,true,null,null,null,null,null,null");
+ fifthQueryRes.add("29,null,null,null,null,null,1003,135.0,false,null,null,null,null,null,null");
+ fifthQueryRes.add("30,1000,null,null,null,null,null,null,null,1006,139,null,null,null,null");
+ fifthQueryRes.add("33,null,null,null,null,null,null,null,true,null,null,null,null,null,null");
+ fifthQueryRes.add("34,null,null,null,null,null,null,null,null,1007,1008,null,null,null,null");
+ fifthQueryRes.add("36,null,null,null,null,null,null,null,null,null,1090,null,null,null,null");
+ fifthQueryRes.add("38,null,null,null,null,121,122,null,null,null,null,null,null,null,null");
+ fifthQueryRes.add("44,null,103,null,null,null,null,null,null,null,null,null,null,null,null");
+ fifthQueryRes.add("70,null,null,null,null,null,33,null,null,null,null,null,null,null,null");
+ fifthQueryRes.add("80,null,null,null,null,null,null,null,null,427,528,null,null,null,null");
+ fifthQueryRes.add("99,null,null,null,null,null,1234,null,null,null,null,null,null,null,null");
+ fifthQueryRes.add("150,null,null,null,null,null,null,null,null,426,null,null,null,null,null");
+ fifthQueryRes.add("200,null,null,null,null,null,null,null,null,null,null,null,1006,null,null");
+ fifthQueryRes.add("202,101,102,null,null,null,null,null,null,null,null,null,null,null,null");
+ fifthQueryRes.add("204,null,null,null,null,null,null,14.0,null,null,null,null,null,null,null");
+ fifthQueryRes.add("206,null,null,null,null,131,132,null,null,null,null,null,null,null,null");
+ fifthQueryRes.add("207,null,null,null,false,null,null,null,null,null,null,null,null,null,null");
+ fifthQueryRes.add("209,null,null,null,null,130,null,null,null,null,null,null,null,null,null");
+ fifthQueryRes.add("250,null,null,10.0,true,null,null,null,null,null,null,null,null,null,null");
+ fifthQueryRes.add("400,null,null,null,null,null,null,null,null,null,null,1050,null,null,null");
+ fifthQueryRes.add("700,null,null,null,null,null,null,null,null,1307,1038,null,null,null,null");
+ fifthQueryRes.add("1000,null,null,null,null,null,null,null,null,null,null,null,null,null,910");
+ fifthQueryRes.add("1004,null,null,null,null,null,null,null,null,null,null,null,1007,1080,null");
+ fifthQueryRes.add("1900,null,null,null,null,null,null,null,null,1316,null,null,null,null,null");
+ fifthQueryRes.add("3000,null,null,null,null,null,null,null,null,null,1309,null,null,null,null");
+ }
+
+ @Test
+ public void testClusterQueryWithLargeData() throws Exception {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + URL, "root", "root")) {
+ Statement statement = connection.createStatement();
+
+ //first round
+ insertData(connection, createSQLs1, insertSQLs1);
+ initCorrectResults1();
+ for(int i =0 ; i < querys1.length; i++) {
+ String queryStatement = querys1[i];
+ boolean hasResultSet = statement.execute(queryStatement);
+ assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ IoTDBResultMetadata resultSetMetaData = (IoTDBResultMetadata) resultSet.getMetaData();
+ int columnCount = resultSetMetaData.getColumnCount();
+ List<String> correctResult = queryCorrentResults.get(i);
+ int count = 0;
+ while (resultSet.next()) {
+ String correctRow = correctResult.get(count++);
+ StringBuilder rowRecordBuilder = new StringBuilder();
+ for (int j = 1; j < columnCount; j++) {
+ rowRecordBuilder.append(resultSet.getString(j)).append(",");
+ }
+ rowRecordBuilder.append(resultSet.getString(columnCount));
+ assertEquals(correctRow, rowRecordBuilder.toString());
+ }
+ }
+
+ // second round
+ insertData(connection, new String[]{}, insertSqls2);
+ initCorrectResults2();
+ for(int i =0 ; i < querys2.length; i++) {
+ String queryStatement = querys2[i];
+ boolean hasResultSet = statement.execute(queryStatement);
+ assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ IoTDBResultMetadata resultSetMetaData = (IoTDBResultMetadata) resultSet.getMetaData();
+ int columnCount = resultSetMetaData.getColumnCount();
+ List<String> correctResult = queryCorrentResults.get(i);
+ int count = 0;
+ while (resultSet.next()) {
+ String correctRow = correctResult.get(count++);
+ StringBuilder rowRecordBuilder = new StringBuilder();
+ for (int j = 1; j < columnCount; j++) {
+ rowRecordBuilder.append(resultSet.getString(j)).append(",");
+ }
+ rowRecordBuilder.append(resultSet.getString(columnCount));
+ assertEquals(correctRow, rowRecordBuilder.toString());
+ }
+ }
+
+ // third round
+ insertData(connection, createSQLs3, insertSQLs3);
+ initCorrectResults3();
+ for(int i =0 ; i < querys3.length; i++) {
+ String queryStatement = querys3[i];
+ boolean hasResultSet = statement.execute(queryStatement);
+ assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ IoTDBResultMetadata resultSetMetaData = (IoTDBResultMetadata) resultSet.getMetaData();
+ int columnCount = resultSetMetaData.getColumnCount();
+ List<String> correctResult = queryCorrentResults.get(i);
+ int count = 0;
+ while (resultSet.next()) {
+ String correctRow = correctResult.get(count++);
+ StringBuilder rowRecordBuilder = new StringBuilder();
+ for (int j = 1; j < columnCount; j++) {
+ rowRecordBuilder.append(resultSet.getString(j)).append(",");
+ }
+ rowRecordBuilder.append(resultSet.getString(columnCount));
+ assertEquals(correctRow, rowRecordBuilder.toString());
+ }
+ }
+
+ statement.close();
+ }
+ }
+}