You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 09:34:46 UTC
[incubator-iotdb] 02/04: fix a serve bug,
concurrent hashmap modification
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 78f7800d7b4e6bc9d979fff58fabf7cf4f7ace9a
Author: lta <li...@163.com>
AuthorDate: Tue May 21 17:33:04 2019 +0800
fix a serve bug, concurrent hashmap modification
---
.../apache/iotdb/cluster/config/ClusterConfig.java | 2 +-
.../cluster/query/executor/ClusterQueryRouter.java | 1 +
.../ClusterRpcSingleQueryManager.java | 22 +-
.../querynode/ClusterLocalQueryManager.java | 7 +-
.../querynode/ClusterLocalSingleQueryManager.java | 10 +-
.../iotdb/cluster/integration/IOTDBGroupByIT.java | 497 +++++++++++++++++++++
.../integration/IoTDBAggregationSmallDataIT.java | 1 -
7 files changed, 525 insertions(+), 15 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 0e6472d..45df39f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -102,7 +102,7 @@ public class ClusterConfig {
* then it sends requests to other nodes in the cluster. This parameter represents the maximum
* timeout for these requests. The unit is milliseconds.
**/
- private int qpTaskTimeout = 5000;
+ private int qpTaskTimeout = 500000;
/**
* Number of virtual nodes
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index f9d32f5..54e0df5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -168,6 +168,7 @@ public class ClusterQueryRouter extends AbstractQueryRouter {
.optimize(expression, selectedSeries);
try {
if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+// queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel());
ClusterGroupByDataSetWithOnlyTimeFilter groupByEngine = new ClusterGroupByDataSetWithOnlyTimeFilter(
jobId, selectedSeries, unit, origin, mergedIntervalList, queryManager);
groupByEngine.initGroupBy(context, aggres, optimizedExpression);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index 8cc4ccd..c9dc701 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -131,12 +132,15 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
private void initSeriesReader(int readDataConsistencyLevel)
throws RaftConnectionException, IOException {
// Init all series with data group of select series,if filter series has the same data group, init them together.
- for (Entry<String, SelectSeriesGroupEntity> entry : selectSeriesGroupEntityMap.entrySet()) {
+ Iterator<Map.Entry<String, SelectSeriesGroupEntity>> selectIterator = selectSeriesGroupEntityMap
+ .entrySet().iterator();
+ while (selectIterator.hasNext()) {
+ Entry<String, SelectSeriesGroupEntity> entry = selectIterator.next();
String groupId = entry.getKey();
SelectSeriesGroupEntity selectEntity = entry.getValue();
QueryPlan queryPlan = selectEntity.getQueryPlan();
if (!QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
- LOGGER.debug("Init series reader for group id {} from remote node." , groupId);
+ LOGGER.debug("Init series reader for group id {} from remote node.", groupId);
Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
allQueryPlan.put(PathType.SELECT_PATH, queryPlan);
List<Filter> filterList = new ArrayList<>();
@@ -153,15 +157,18 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
.createClusterSeriesReader(groupId, request, this);
handleInitReaderResponse(groupId, allQueryPlan, response);
} else {
- LOGGER.debug("Init series reader for group id {} locally." , groupId);
+ LOGGER.debug("Init series reader for group id {} locally.", groupId);
dataGroupUsage.add(groupId);
- selectSeriesGroupEntityMap.remove(groupId);
+ selectIterator.remove();
filterSeriesGroupEntityMap.remove(groupId);
}
}
//Init series reader with data groups of filter series, which don't exist in data groups list of select series.
- for (Entry<String, FilterSeriesGroupEntity> entry : filterSeriesGroupEntityMap.entrySet()) {
+ Iterator<Map.Entry<String, FilterSeriesGroupEntity>> filterIterator = filterSeriesGroupEntityMap
+ .entrySet().iterator();
+ while (filterIterator.hasNext()) {
+ Entry<String, FilterSeriesGroupEntity> entry = filterIterator.next();
String groupId = entry.getKey();
if (!selectSeriesGroupEntityMap.containsKey(groupId) && !QPExecutorUtils
.canHandleQueryByGroupId(groupId)) {
@@ -177,7 +184,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
handleInitReaderResponse(groupId, allQueryPlan, response);
} else if (!selectSeriesGroupEntityMap.containsKey(groupId)) {
dataGroupUsage.add(groupId);
- filterSeriesGroupEntityMap.remove(groupId);
+ filterIterator.remove();
}
}
}
@@ -268,7 +275,8 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
/**
* Handle response of fetching data, and add batch data to corresponding reader.
*/
- private void handleFetchDataByTimestampResponseForSelectPaths(String groupId, BasicQueryDataResponse response) {
+ private void handleFetchDataByTimestampResponseForSelectPaths(String groupId,
+ BasicQueryDataResponse response) {
List<BatchData> batchDataList = response.getSeriesBatchData();
List<ClusterSelectSeriesReader> selectSeriesReaders = selectSeriesGroupEntityMap.get(groupId)
.getSelectSeriesReaders();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index 4e09af8..c83e2a2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -59,7 +59,12 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
TASK_ID_MAP_JOB_ID.put(taskId, jobId);
ClusterLocalSingleQueryManager localQueryManager = new ClusterLocalSingleQueryManager(jobId);
SINGLE_QUERY_MANAGER_MAP.put(jobId, localQueryManager);
- return localQueryManager.createSeriesReader(request);
+ try {
+ return localQueryManager.createSeriesReader(request);
+ }catch (Exception e){
+ e.printStackTrace();
+ return null;
+ }
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 25adbf5..e9c0dcb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -492,11 +492,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
@Override
public void run() {
- try {
- close();
- } catch (FileNodeManagerException e) {
- LOGGER.error(e.getMessage());
- }
+// try {
+//// close();
+// } catch (FileNodeManagerException e) {
+// LOGGER.error(e.getMessage());
+// }
}
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
new file mode 100644
index 0000000..265d509
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.integration;
+
+import static org.apache.iotdb.cluster.integration.Constant.count;
+import static org.apache.iotdb.cluster.integration.Constant.first;
+import static org.apache.iotdb.cluster.integration.Constant.last;
+import static org.apache.iotdb.cluster.integration.Constant.max_time;
+import static org.apache.iotdb.cluster.integration.Constant.max_value;
+import static org.apache.iotdb.cluster.integration.Constant.mean;
+import static org.apache.iotdb.cluster.integration.Constant.min_time;
+import static org.apache.iotdb.cluster.integration.Constant.min_value;
+import static org.apache.iotdb.cluster.integration.Constant.sum;
+import static org.apache.iotdb.cluster.utils.Utils.insertData;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.utils.EnvironmentUtils;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
+import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IOTDBGroupByIT {
+
+ private Server server;
+ private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
+ private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
+ CLUSTER_CONFIG.getPort());
+
+ private static String[] createSqls = new String[]{
+ "SET STORAGE GROUP TO root.ln.wf01.wt01",
+ "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN"};
+ private static String[] insertSqls = new String[]{
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(1, 1.1, false, 11)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(2, 2.2, true, 22)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(3, 3.3, false, 33 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(4, 4.4, false, 44)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(5, 5.5, false, 55)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(100, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(150, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(200, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(250, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(300, 500.5, false, 550)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(10, 10.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(20, 20.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(30, 30.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(40, 40.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(50, 50.5, false, 550)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(500, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(510, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(520, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(530, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(540, 500.5, false, 550)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(580, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(590, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(600, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(610, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(620, 500.5, false, 550)",
+ };
+
+ private static final String TIMESTAMP_STR = "Time";
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.closeMemControl();
+ CLUSTER_CONFIG.createAllPath();
+ server = Server.getInstance();
+ QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+ server.start();
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ insertSql();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ server.stop();
+ QPExecutorUtils.setLocalNodeAddr(localNode.getIp(), localNode.getPort());
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void countSumMeanTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "2,1,4.4,4.4",
+ "5,3,35.8,11.933333333333332",
+ "25,1,30.3,30.3",
+ "50,1,50.5,50.5",
+ "65,0,0.0,null",
+ "85,1,100.1,100.1",
+ "105,0,0.0,null",
+ "125,0,0.0,null",
+ "145,1,200.2,200.2",
+ "310,0,0.0,null"
+ };
+ String[] retArray2 = new String[]{
+ "2,2,7.7,3.85",
+ "5,3,35.8,11.933333333333332",
+ "25,1,30.3,30.3",
+ "50,1,50.5,50.5",
+ "65,0,0.0,null",
+ "85,1,100.1,100.1",
+ "105,0,0.0,null",
+ "125,0,0.0,null",
+ "145,1,200.2,200.2",
+ "310,0,0.0,null"
+ };
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
+// Statement statement = connection.createStatement();
+// boolean hasResultSet = statement.execute(
+// "select count(temperature), sum(temperature), mean(temperature) from "
+// + "root.ln.wf01.wt01 where time > 3 "
+// + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+//
+// Assert.assertTrue(hasResultSet);
+// ResultSet resultSet = statement.getResultSet();
+// int cnt = 0;
+// while (resultSet.next()) {
+// String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+// .getString(count("root.ln.wf01.wt01.temperature")) + "," +
+// resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
+// .getString(mean("root.ln.wf01.wt01.temperature"));
+// Assert.assertEquals(retArray1[cnt], ans);
+// cnt++;
+// }
+// Assert.assertEquals(retArray1.length, cnt);
+// statement.close();
+
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute(
+ "select count(temperature), sum(temperature), mean(temperature) from "
+ + "root.ln.wf01.wt01 where temperature > 3 "
+ + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+ .getString(count("root.ln.wf01.wt01.temperature")) + "," +
+ resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
+ .getString(mean("root.ln.wf01.wt01.temperature"));
+ Assert.assertEquals(retArray2[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray2.length, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void maxMinValeTimeTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "2,4.4,4.4,4,4",
+ "5,20.2,5.5,20,5",
+ "25,30.3,30.3,30,30",
+ "50,50.5,50.5,50,50",
+ "65,null,null,null,null",
+ "85,100.1,100.1,100,100",
+ "105,null,null,null,null",
+ "125,null,null,null,null",
+ "145,200.2,200.2,150,150",
+ "310,null,null,null,null"
+ };
+ String[] retArray2 = new String[]{
+ "2,4.4,3.3,4,3",
+ "5,20.2,5.5,20,5",
+ "25,30.3,30.3,30,30",
+ "50,50.5,50.5,50,50",
+ "65,null,null,null,null",
+ "85,100.1,100.1,100,100",
+ "105,null,null,null,null",
+ "125,null,null,null,null",
+ "145,200.2,200.2,150,150",
+ "310,null,null,null,null"
+ };
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute(
+ "select max_value(temperature), min_value(temperature), max_time(temperature), "
+ + "min_time(temperature) from root.ln.wf01.wt01 where time > 3 "
+ + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+ .getString(max_value("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(min_value("root.ln.wf01.wt01.temperature")) + ","
+ + resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray1.length, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute(
+ "select max_value(temperature), min_value(temperature), max_time(temperature), "
+ + "min_time(temperature) from root.ln.wf01.wt01 where temperature > 3 "
+ + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+ .getString(max_value("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(min_value("root.ln.wf01.wt01.temperature")) + ","
+ + resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+ Assert.assertEquals(retArray2[cnt], ans);
+ cnt++;
+ //System.out.println(ans);
+ }
+ Assert.assertEquals(retArray2.length, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void firstLastTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "2,4.4,4.4",
+ "5,20.2,5.5",
+ "25,30.3,30.3",
+ "50,50.5,50.5",
+ "65,null,null",
+ "85,100.1,100.1",
+ "105,null,null",
+ "125,null,null",
+ "145,200.2,200.2",
+ "310,null,null"
+ };
+ String[] retArray2 = new String[]{
+ "2,4.4,3.3",
+ "5,20.2,5.5",
+ "25,30.3,30.3",
+ "50,50.5,50.5",
+ "65,null,null",
+ "85,100.1,100.1",
+ "105,null,null",
+ "125,null,null",
+ "145,200.2,200.2",
+ "310,null,null"
+ };
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute(
+ "select last(temperature), first(temperature) from root.ln.wf01.wt01 where time > 3 "
+ + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+ .getString(last("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature"));
+ System.out.println(ans);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray1.length, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute(
+ "select first(temperature), last(temperature) from root.ln.wf01.wt01 "
+ + "where temperature > 3 "
+ + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+ .getString(last("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature"));
+ System.out.println(ans);
+ Assert.assertEquals(retArray2[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray2.length, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void largeIntervalTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "2,4.4,4,20,4",
+ "30,30.3,16,610,30",
+ "620,500.5,1,620,620"
+ };
+ String[] retArray2 = new String[]{
+ "2,3.3,5,20,3",
+ "30,30.3,16,610,30",
+ "620,500.5,1,620,620"
+ };
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute(
+ "select min_value(temperature), count(temperature), max_time(temperature), "
+ + "min_time(temperature) from root.ln.wf01.wt01 where time > 3 GROUP BY "
+ + "(590ms, 30, [2, 30], [30, 120], [100, 120], [123, 125], [155, 550], [540, 680])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+ .getString(min_value("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(count("root.ln.wf01.wt01.temperature")) + "," +
+ resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray1.length, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute(
+ "select min_value(temperature), count (temperature), max_time(temperature), "
+ + "min_time(temperature) from root.ln.wf01.wt01 where temperature > 3 GROUP BY "
+ + "(590ms, 30, [2, 30], [30, 120], [100, 120], [123, 125], [155, 550],[540, 680])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+ .getString(min_value("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(count("root.ln.wf01.wt01.temperature")) + ","
+ + resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+ Assert.assertEquals(retArray2[cnt], ans);
+ cnt++;
+ //System.out.println(ans);
+ }
+ Assert.assertEquals(retArray2.length, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void smallPartitionTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "50,100.1,50.5,150.6",
+ "615,500.5,500.5,500.5"
+
+ };
+ String[] retArray2 = new String[]{
+ "50,100.1,50.5,150.6",
+ "585,null,null,0.0",
+ "590,500.5,200.2,700.7"
+ };
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute(
+ "select last(temperature), first(temperature), sum(temperature) from "
+ + "root.ln.wf01.wt01 where time > 3 "
+ + "GROUP BY (80ms, 30,[50,100], [615, 650])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+ .getString(last("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")) + ","
+ + resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
+ System.out.println(ans);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray1.length, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute(
+ "select first(temperature), last(temperature), sum(temperature) from "
+ + "root.ln.wf01.wt01 where temperature > 3 "
+ + "GROUP BY (80ms, 30,[50,100], [585,590], [615, 650])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+ .getString(last("root.ln.wf01.wt01.temperature"))
+ + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")) + ","
+ + resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
+ System.out.println(ans);
+ Assert.assertEquals(retArray2[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray2.length, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private void insertSql() {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root")) {
+ insertData(connection, createSqls, insertSqls);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
index 162c5ac..77afbea 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.jdbc.Config;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
/**