You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 09:34:46 UTC

[incubator-iotdb] 02/04: fix a serve bug, concurrent hashmap modification

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 78f7800d7b4e6bc9d979fff58fabf7cf4f7ace9a
Author: lta <li...@163.com>
AuthorDate: Tue May 21 17:33:04 2019 +0800

    fix a serve bug, concurrent hashmap modification
---
 .../apache/iotdb/cluster/config/ClusterConfig.java |   2 +-
 .../cluster/query/executor/ClusterQueryRouter.java |   1 +
 .../ClusterRpcSingleQueryManager.java              |  22 +-
 .../querynode/ClusterLocalQueryManager.java        |   7 +-
 .../querynode/ClusterLocalSingleQueryManager.java  |  10 +-
 .../iotdb/cluster/integration/IOTDBGroupByIT.java  | 497 +++++++++++++++++++++
 .../integration/IoTDBAggregationSmallDataIT.java   |   1 -
 7 files changed, 525 insertions(+), 15 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 0e6472d..45df39f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -102,7 +102,7 @@ public class ClusterConfig {
    * then it sends requests to other nodes in the cluster. This parameter represents the maximum
    * timeout for these requests. The unit is milliseconds.
    **/
-  private int qpTaskTimeout = 5000;
+  private int qpTaskTimeout = 500000;
 
   /**
    * Number of virtual nodes
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index f9d32f5..54e0df5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -168,6 +168,7 @@ public class ClusterQueryRouter extends AbstractQueryRouter {
         .optimize(expression, selectedSeries);
     try {
       if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+//        queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel());
         ClusterGroupByDataSetWithOnlyTimeFilter groupByEngine = new ClusterGroupByDataSetWithOnlyTimeFilter(
             jobId, selectedSeries, unit, origin, mergedIntervalList, queryManager);
         groupByEngine.initGroupBy(context, aggres, optimizedExpression);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index 8cc4ccd..c9dc701 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -131,12 +132,15 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
   private void initSeriesReader(int readDataConsistencyLevel)
       throws RaftConnectionException, IOException {
     // Init all series with data group of select series,if filter series has the same data group, init them together.
-    for (Entry<String, SelectSeriesGroupEntity> entry : selectSeriesGroupEntityMap.entrySet()) {
+    Iterator<Map.Entry<String, SelectSeriesGroupEntity>> selectIterator = selectSeriesGroupEntityMap
+        .entrySet().iterator();
+    while (selectIterator.hasNext()) {
+      Entry<String, SelectSeriesGroupEntity> entry = selectIterator.next();
       String groupId = entry.getKey();
       SelectSeriesGroupEntity selectEntity = entry.getValue();
       QueryPlan queryPlan = selectEntity.getQueryPlan();
       if (!QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
-        LOGGER.debug("Init series reader for group id {} from remote node." , groupId);
+        LOGGER.debug("Init series reader for group id {} from remote node.", groupId);
         Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
         allQueryPlan.put(PathType.SELECT_PATH, queryPlan);
         List<Filter> filterList = new ArrayList<>();
@@ -153,15 +157,18 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
             .createClusterSeriesReader(groupId, request, this);
         handleInitReaderResponse(groupId, allQueryPlan, response);
       } else {
-        LOGGER.debug("Init series reader for group id {} locally." , groupId);
+        LOGGER.debug("Init series reader for group id {} locally.", groupId);
         dataGroupUsage.add(groupId);
-        selectSeriesGroupEntityMap.remove(groupId);
+        selectIterator.remove();
         filterSeriesGroupEntityMap.remove(groupId);
       }
     }
 
     //Init series reader with data groups of filter series, which don't exist in data groups list of select series.
-    for (Entry<String, FilterSeriesGroupEntity> entry : filterSeriesGroupEntityMap.entrySet()) {
+    Iterator<Map.Entry<String, FilterSeriesGroupEntity>> filterIterator = filterSeriesGroupEntityMap
+        .entrySet().iterator();
+    while (filterIterator.hasNext()) {
+      Entry<String, FilterSeriesGroupEntity> entry = filterIterator.next();
       String groupId = entry.getKey();
       if (!selectSeriesGroupEntityMap.containsKey(groupId) && !QPExecutorUtils
           .canHandleQueryByGroupId(groupId)) {
@@ -177,7 +184,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
         handleInitReaderResponse(groupId, allQueryPlan, response);
       } else if (!selectSeriesGroupEntityMap.containsKey(groupId)) {
         dataGroupUsage.add(groupId);
-        filterSeriesGroupEntityMap.remove(groupId);
+        filterIterator.remove();
       }
     }
   }
@@ -268,7 +275,8 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
   /**
    * Handle response of fetching data, and add batch data to corresponding reader.
    */
-  private void handleFetchDataByTimestampResponseForSelectPaths(String groupId, BasicQueryDataResponse response) {
+  private void handleFetchDataByTimestampResponseForSelectPaths(String groupId,
+      BasicQueryDataResponse response) {
     List<BatchData> batchDataList = response.getSeriesBatchData();
     List<ClusterSelectSeriesReader> selectSeriesReaders = selectSeriesGroupEntityMap.get(groupId)
         .getSelectSeriesReaders();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index 4e09af8..c83e2a2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -59,7 +59,12 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
     TASK_ID_MAP_JOB_ID.put(taskId, jobId);
     ClusterLocalSingleQueryManager localQueryManager = new ClusterLocalSingleQueryManager(jobId);
     SINGLE_QUERY_MANAGER_MAP.put(jobId, localQueryManager);
-    return localQueryManager.createSeriesReader(request);
+    try {
+      return localQueryManager.createSeriesReader(request);
+    }catch (Exception e){
+      e.printStackTrace();
+      return null;
+    }
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 25adbf5..e9c0dcb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -492,11 +492,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
 
     @Override
     public void run() {
-      try {
-        close();
-      } catch (FileNodeManagerException e) {
-        LOGGER.error(e.getMessage());
-      }
+//      try {
+////        close();
+//      } catch (FileNodeManagerException e) {
+//        LOGGER.error(e.getMessage());
+//      }
     }
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
new file mode 100644
index 0000000..265d509
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.integration;
+
+import static org.apache.iotdb.cluster.integration.Constant.count;
+import static org.apache.iotdb.cluster.integration.Constant.first;
+import static org.apache.iotdb.cluster.integration.Constant.last;
+import static org.apache.iotdb.cluster.integration.Constant.max_time;
+import static org.apache.iotdb.cluster.integration.Constant.max_value;
+import static org.apache.iotdb.cluster.integration.Constant.mean;
+import static org.apache.iotdb.cluster.integration.Constant.min_time;
+import static org.apache.iotdb.cluster.integration.Constant.min_value;
+import static org.apache.iotdb.cluster.integration.Constant.sum;
+import static org.apache.iotdb.cluster.utils.Utils.insertData;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.utils.EnvironmentUtils;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
+import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IOTDBGroupByIT {
+
+  private Server server;
+  private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
+  private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
+      CLUSTER_CONFIG.getPort());
+
+  private static String[] createSqls = new String[]{
+      "SET STORAGE GROUP TO root.ln.wf01.wt01",
+      "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN"};
+  private static String[] insertSqls = new String[]{
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(1, 1.1, false, 11)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(2, 2.2, true, 22)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(3, 3.3, false, 33 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(4, 4.4, false, 44)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(5, 5.5, false, 55)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(100, 100.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(150, 200.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(200, 300.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(250, 400.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(300, 500.5, false, 550)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(10, 10.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(20, 20.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(30, 30.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(40, 40.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(50, 50.5, false, 550)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(500, 100.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(510, 200.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(520, 300.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(530, 400.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(540, 500.5, false, 550)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(580, 100.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(590, 200.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(600, 300.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(610, 400.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(620, 500.5, false, 550)",
+  };
+
+  private static final String TIMESTAMP_STR = "Time";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.closeMemControl();
+    CLUSTER_CONFIG.createAllPath();
+    server = Server.getInstance();
+    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+    server.start();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    insertSql();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    server.stop();
+    QPExecutorUtils.setLocalNodeAddr(localNode.getIp(), localNode.getPort());
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void countSumMeanTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "2,1,4.4,4.4",
+        "5,3,35.8,11.933333333333332",
+        "25,1,30.3,30.3",
+        "50,1,50.5,50.5",
+        "65,0,0.0,null",
+        "85,1,100.1,100.1",
+        "105,0,0.0,null",
+        "125,0,0.0,null",
+        "145,1,200.2,200.2",
+        "310,0,0.0,null"
+    };
+    String[] retArray2 = new String[]{
+        "2,2,7.7,3.85",
+        "5,3,35.8,11.933333333333332",
+        "25,1,30.3,30.3",
+        "50,1,50.5,50.5",
+        "65,0,0.0,null",
+        "85,1,100.1,100.1",
+        "105,0,0.0,null",
+        "125,0,0.0,null",
+        "145,1,200.2,200.2",
+        "310,0,0.0,null"
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
+//      Statement statement = connection.createStatement();
+//      boolean hasResultSet = statement.execute(
+//          "select count(temperature), sum(temperature), mean(temperature) from "
+//              + "root.ln.wf01.wt01 where time > 3 "
+//              + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+//
+//      Assert.assertTrue(hasResultSet);
+//      ResultSet resultSet = statement.getResultSet();
+//      int cnt = 0;
+//      while (resultSet.next()) {
+//        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+//            .getString(count("root.ln.wf01.wt01.temperature")) + "," +
+//            resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
+//            .getString(mean("root.ln.wf01.wt01.temperature"));
+//        Assert.assertEquals(retArray1[cnt], ans);
+//        cnt++;
+//      }
+//      Assert.assertEquals(retArray1.length, cnt);
+//      statement.close();
+
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute(
+          "select count(temperature), sum(temperature), mean(temperature) from "
+              + "root.ln.wf01.wt01 where temperature > 3 "
+              + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+            .getString(count("root.ln.wf01.wt01.temperature")) + "," +
+            resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
+            .getString(mean("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray2[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray2.length, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void maxMinValeTimeTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "2,4.4,4.4,4,4",
+        "5,20.2,5.5,20,5",
+        "25,30.3,30.3,30,30",
+        "50,50.5,50.5,50,50",
+        "65,null,null,null,null",
+        "85,100.1,100.1,100,100",
+        "105,null,null,null,null",
+        "125,null,null,null,null",
+        "145,200.2,200.2,150,150",
+        "310,null,null,null,null"
+    };
+    String[] retArray2 = new String[]{
+        "2,4.4,3.3,4,3",
+        "5,20.2,5.5,20,5",
+        "25,30.3,30.3,30,30",
+        "50,50.5,50.5,50,50",
+        "65,null,null,null,null",
+        "85,100.1,100.1,100,100",
+        "105,null,null,null,null",
+        "125,null,null,null,null",
+        "145,200.2,200.2,150,150",
+        "310,null,null,null,null"
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute(
+          "select max_value(temperature), min_value(temperature), max_time(temperature), "
+              + "min_time(temperature) from root.ln.wf01.wt01 where time > 3 "
+              + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+            .getString(max_value("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(min_value("root.ln.wf01.wt01.temperature")) + ","
+            + resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray1.length, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute(
+          "select max_value(temperature), min_value(temperature), max_time(temperature), "
+              + "min_time(temperature) from root.ln.wf01.wt01 where temperature > 3 "
+              + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+            .getString(max_value("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(min_value("root.ln.wf01.wt01.temperature")) + ","
+            + resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray2[cnt], ans);
+        cnt++;
+        //System.out.println(ans);
+      }
+      Assert.assertEquals(retArray2.length, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void firstLastTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "2,4.4,4.4",
+        "5,20.2,5.5",
+        "25,30.3,30.3",
+        "50,50.5,50.5",
+        "65,null,null",
+        "85,100.1,100.1",
+        "105,null,null",
+        "125,null,null",
+        "145,200.2,200.2",
+        "310,null,null"
+    };
+    String[] retArray2 = new String[]{
+        "2,4.4,3.3",
+        "5,20.2,5.5",
+        "25,30.3,30.3",
+        "50,50.5,50.5",
+        "65,null,null",
+        "85,100.1,100.1",
+        "105,null,null",
+        "125,null,null",
+        "145,200.2,200.2",
+        "310,null,null"
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute(
+          "select last(temperature), first(temperature) from root.ln.wf01.wt01 where time > 3 "
+              + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+            .getString(last("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature"));
+        System.out.println(ans);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray1.length, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute(
+          "select first(temperature), last(temperature) from root.ln.wf01.wt01 "
+              + "where temperature > 3 "
+              + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+            .getString(last("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature"));
+        System.out.println(ans);
+        Assert.assertEquals(retArray2[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray2.length, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void largeIntervalTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "2,4.4,4,20,4",
+        "30,30.3,16,610,30",
+        "620,500.5,1,620,620"
+    };
+    String[] retArray2 = new String[]{
+        "2,3.3,5,20,3",
+        "30,30.3,16,610,30",
+        "620,500.5,1,620,620"
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute(
+          "select min_value(temperature), count(temperature), max_time(temperature), "
+              + "min_time(temperature) from root.ln.wf01.wt01 where time > 3 GROUP BY "
+              + "(590ms, 30, [2, 30], [30, 120], [100, 120], [123, 125], [155, 550], [540, 680])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+            .getString(min_value("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(count("root.ln.wf01.wt01.temperature")) + "," +
+            resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray1.length, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute(
+          "select min_value(temperature), count (temperature), max_time(temperature), "
+              + "min_time(temperature) from root.ln.wf01.wt01 where temperature > 3 GROUP BY "
+              + "(590ms, 30, [2, 30], [30, 120], [100, 120], [123, 125], [155, 550],[540, 680])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+            .getString(min_value("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(count("root.ln.wf01.wt01.temperature")) + ","
+            + resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray2[cnt], ans);
+        cnt++;
+        //System.out.println(ans);
+      }
+      Assert.assertEquals(retArray2.length, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void smallPartitionTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "50,100.1,50.5,150.6",
+        "615,500.5,500.5,500.5"
+
+    };
+    String[] retArray2 = new String[]{
+        "50,100.1,50.5,150.6",
+        "585,null,null,0.0",
+        "590,500.5,200.2,700.7"
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute(
+          "select last(temperature), first(temperature), sum(temperature) from "
+              + "root.ln.wf01.wt01 where time > 3 "
+              + "GROUP BY (80ms, 30,[50,100], [615, 650])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+            .getString(last("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")) + ","
+            + resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
+        System.out.println(ans);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray1.length, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute(
+          "select first(temperature), last(temperature), sum(temperature) from "
+              + "root.ln.wf01.wt01 where temperature > 3 "
+              + "GROUP BY (80ms, 30,[50,100], [585,590], [615, 650])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+            .getString(last("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")) + ","
+            + resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
+        System.out.println(ans);
+        Assert.assertEquals(retArray2[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray2.length, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private void insertSql() {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root")) {
+      insertData(connection, createSqls, insertSqls);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
index 162c5ac..77afbea 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.jdbc.Config;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**