You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/06/09 09:10:23 UTC

[incubator-iotdb] branch master updated: [IOTDB-755] Fix count bug (#1335)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a7ac8c1  [IOTDB-755] Fix count bug (#1335)
a7ac8c1 is described below

commit a7ac8c195745d1c36f0756086107e25427dd6f9b
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Tue Jun 9 17:10:09 2020 +0800

    [IOTDB-755] Fix count bug (#1335)
    
    * fix count bug in align by device
---
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   4 +-
 .../db/qp/physical/crud/AlignByDevicePlan.java     |  24 +++--
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  71 +++++++++----
 .../db/query/dataset/AlignByDeviceDataSet.java     |  11 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   2 +-
 .../IoTDBAggregationAlignByDeviceIT.java           | 113 +++++++++++++++++++++
 6 files changed, 192 insertions(+), 33 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 6113424..fd53ec3 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -283,7 +283,7 @@ public class PlanExecutor implements IPlanExecutor {
 
   protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext context)
       throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException,
-      IOException {
+      IOException, MetadataException {
     QueryDataSet queryDataSet;
     if (queryPlan instanceof AlignByDevicePlan) {
       queryDataSet = getAlignByDeviceDataSet((AlignByDevicePlan) queryPlan, context, queryRouter);
@@ -315,7 +315,7 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   protected AlignByDeviceDataSet getAlignByDeviceDataSet(AlignByDevicePlan plan,
-      QueryContext context, IQueryRouter router) {
+      QueryContext context, IQueryRouter router) throws MetadataException {
     return new AlignByDeviceDataSet(plan, context, router);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index 3edd8b2..5fde38f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -30,11 +30,15 @@ public class AlignByDevicePlan extends QueryPlan {
   private List<String> measurements; // to record result measurement columns, e.g. temperature, status, speed
   // to check data type consistency for the same name sensor of different devices
   private List<String> devices;
-  private Map<String, TSDataType> measurementDataTypeMap;
+  // to record the datatype of the column in the result set
+  private Map<String, TSDataType> columnDataTypeMap;
   private Map<String, IExpression> deviceToFilterMap;
   // to record different kinds of measurement
   private Map<String, MeasurementType> measurementTypeMap;
 
+  // to record the real type of the measurement
+  private Map<String, TSDataType> measurementDataTypeMap;
+
   private GroupByTimePlan groupByTimePlan;
   private FillQueryPlan fillQueryPlan;
   private AggregationPlan aggregationPlan;
@@ -59,13 +63,13 @@ public class AlignByDevicePlan extends QueryPlan {
     return devices;
   }
 
-  public void setMeasurementDataTypeMap(
-      Map<String, TSDataType> measurementDataTypeMap) {
-    this.measurementDataTypeMap = measurementDataTypeMap;
+  public void setColumnDataTypeMap(
+      Map<String, TSDataType> columnDataTypeMap) {
+    this.columnDataTypeMap = columnDataTypeMap;
   }
 
-  public Map<String, TSDataType> getMeasurementDataTypeMap() {
-    return measurementDataTypeMap;
+  public Map<String, TSDataType> getColumnDataTypeMap() {
+    return columnDataTypeMap;
   }
 
   public Map<String, IExpression> getDeviceToFilterMap() {
@@ -85,6 +89,14 @@ public class AlignByDevicePlan extends QueryPlan {
     this.measurementTypeMap = measurementTypeMap;
   }
 
+  public Map<String, TSDataType> getMeasurementDataTypeMap() {
+    return measurementDataTypeMap;
+  }
+
+  public void setMeasurementDataTypeMap(Map<String, TSDataType> measurementDataTypeMap) {
+    this.measurementDataTypeMap = measurementDataTypeMap;
+  }
+
   public GroupByTimePlan getGroupByTimePlan() {
     return groupByTimePlan;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 644b99b..cff982c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -47,9 +47,9 @@ import org.apache.iotdb.db.qp.logical.sys.CreateTimeSeriesOperator;
 import org.apache.iotdb.db.qp.logical.sys.DataAuthOperator;
 import org.apache.iotdb.db.qp.logical.sys.DeleteStorageGroupOperator;
 import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator;
+import org.apache.iotdb.db.qp.logical.sys.FlushOperator;
 import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator;
 import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator.LoadConfigurationOperatorType;
-import org.apache.iotdb.db.qp.logical.sys.FlushOperator;
 import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator;
 import org.apache.iotdb.db.qp.logical.sys.LoadFilesOperator;
 import org.apache.iotdb.db.qp.logical.sys.MoveFileOperator;
@@ -61,9 +61,17 @@ import org.apache.iotdb.db.qp.logical.sys.ShowDevicesOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowTTLOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowTimeSeriesOperator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.ClearCachePlan;
@@ -72,10 +80,10 @@ import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan.LoadConfigurationPlanType;
 import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
-import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.MergePlan;
 import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
@@ -91,14 +99,11 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
 
 /**
  * Used to convert logical operator to physical plan
  */
 public class PhysicalGenerator {
-  private static Logger logger = LoggerFactory.getLogger(PhysicalGenerator.class);
 
   public PhysicalPlan transformToPhysicalPlan(Operator operator) throws QueryProcessException {
     List<Path> paths;
@@ -291,9 +296,25 @@ public class PhysicalGenerator {
 
   }
 
-  protected List<TSDataType> getSeriesTypes(List<String> paths,
+  /**
+   * get types for path list
+   *
+   * @return pair.left is the type of column in result set, pair.right is the real type of the
+   * measurement
+   */
+  protected Pair<List<TSDataType>, List<TSDataType>> getSeriesTypes(List<String> paths,
       String aggregation) throws MetadataException {
-    return SchemaUtils.getSeriesTypesByString(paths, aggregation);
+    List<TSDataType> measurementDataTypes = SchemaUtils.getSeriesTypesByString(paths, null);
+    // if the aggregation function is null, the type of column in result set
+    // is equal to the real type of the measurement
+    if (aggregation == null) {
+      return new Pair<>(measurementDataTypes, measurementDataTypes);
+    } else {
+      // if the aggregation function is not null,
+      // we should recalculate the type of column in result set
+      List<TSDataType> columnDataTypes = SchemaUtils.getSeriesTypesByString(paths, aggregation);
+      return new Pair<>(columnDataTypes, measurementDataTypes);
+    }
   }
 
   protected List<TSDataType> getSeriesTypes(List<Path> paths) throws MetadataException {
@@ -341,7 +362,8 @@ public class PhysicalGenerator {
 
       if (queryOperator.getLevel() >= 0) {
         for (int i = 0; i < queryOperator.getSelectOperator().getAggregations().size(); i++) {
-          if (!SQLConstant.COUNT.equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
+          if (!SQLConstant.COUNT
+              .equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
             throw new QueryProcessException("group by level only support count now.");
           }
         }
@@ -357,12 +379,13 @@ public class PhysicalGenerator {
       ((FillQueryPlan) queryPlan).setFillType(queryOperator.getFillTypes());
     } else if (queryOperator.hasAggregation()) {
       queryPlan = new AggregationPlan();
-      ((AggregationPlan)queryPlan).setLevel(queryOperator.getLevel());
+      ((AggregationPlan) queryPlan).setLevel(queryOperator.getLevel());
       ((AggregationPlan) queryPlan)
           .setAggregations(queryOperator.getSelectOperator().getAggregations());
       if (queryOperator.getLevel() >= 0) {
         for (int i = 0; i < queryOperator.getSelectOperator().getAggregations().size(); i++) {
-          if (!SQLConstant.COUNT.equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
+          if (!SQLConstant.COUNT
+              .equals(queryOperator.getSelectOperator().getAggregations().get(i))) {
             throw new QueryProcessException("group by level only support count now.");
           }
         }
@@ -387,7 +410,7 @@ public class PhysicalGenerator {
       } else if (queryPlan instanceof FillQueryPlan) {
         alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan);
       } else if (queryPlan instanceof AggregationPlan) {
-        if (((AggregationPlan)queryPlan).getLevel() >= 0) {
+        if (((AggregationPlan) queryPlan).getLevel() >= 0) {
           throw new QueryProcessException("group by level does not support align by device now.");
         }
         alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan);
@@ -402,8 +425,12 @@ public class PhysicalGenerator {
       // to record result measurement columns
       List<String> measurements = new ArrayList<>();
       // to check the same measurement of different devices having the same datatype
-      Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
+      // record the data type of each column of result set
+      Map<String, TSDataType> columnDataTypeMap = new HashMap<>();
       Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
+
+      // to record the real type of the corresponding measurement
+      Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
       List<Path> paths = new ArrayList<>();
 
       for (int i = 0; i < suffixPaths.size(); i++) { // per suffix in SELECT
@@ -436,7 +463,11 @@ public class PhysicalGenerator {
             String aggregation =
                 originAggregations != null && !originAggregations.isEmpty()
                     ? originAggregations.get(i) : null;
-            List<TSDataType> dataTypes = getSeriesTypes(actualPaths, aggregation);
+
+            Pair<List<TSDataType>, List<TSDataType>> pair = getSeriesTypes(actualPaths,
+                aggregation);
+            List<TSDataType> columnDataTypes = pair.left;
+            List<TSDataType> measurementDataTypes = pair.right;
             for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
               Path path = new Path(actualPaths.get(pathIdx));
 
@@ -449,16 +480,17 @@ public class PhysicalGenerator {
               } else {
                 measurementChecked = path.getMeasurement();
               }
-              TSDataType dataType = dataTypes.get(pathIdx);
-              if (measurementDataTypeMap.containsKey(measurementChecked)) {
-                if (!dataType.equals(measurementDataTypeMap.get(measurementChecked))) {
+              TSDataType columnDataType = columnDataTypes.get(pathIdx);
+              if (columnDataTypeMap.containsKey(measurementChecked)) {
+                if (!columnDataType.equals(columnDataTypeMap.get(measurementChecked))) {
                   throw new QueryProcessException(
                       "The data types of the same measurement column should be the same across "
                           + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the "
                           + "SQL document.");
                 }
               } else {
-                measurementDataTypeMap.put(measurementChecked, dataType);
+                columnDataTypeMap.put(measurementChecked, columnDataType);
+                measurementDataTypeMap.put(measurementChecked, measurementDataTypes.get(pathIdx));
               }
 
               // update measurementSetOfGivenSuffix and Normal measurement
@@ -498,8 +530,9 @@ public class PhysicalGenerator {
       // assigns to alignByDevicePlan
       alignByDevicePlan.setMeasurements(measurements);
       alignByDevicePlan.setDevices(devices);
-      alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
+      alignByDevicePlan.setColumnDataTypeMap(columnDataTypeMap);
       alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
+      alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
       alignByDevicePlan.setPaths(paths);
 
       // get deviceToFilterMap
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 438f3a8..2988800 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -62,7 +62,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private List<String> devices;
   private Map<String, IExpression> deviceToFilterMap;
   private Map<String, MeasurementType> measurementTypeMap;
-  private Map<String, TSDataType> measurementDataTpeMap;
+  // record the real type of the corresponding measurement
+  private Map<String, TSDataType> measurementDataTypeMap;
 
   private GroupByTimePlan groupByTimePlan;
   private FillQueryPlan fillQueryPlan;
@@ -76,12 +77,12 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private List<String> executeColumns;
 
   public AlignByDeviceDataSet(AlignByDevicePlan alignByDevicePlan, QueryContext context,
-      IQueryRouter queryRouter) {
+      IQueryRouter queryRouter) throws MetadataException {
     super(null, alignByDevicePlan.getDataTypes());
 
     this.measurements = alignByDevicePlan.getMeasurements();
     this.devices = alignByDevicePlan.getDevices();
-    this.measurementDataTpeMap = alignByDevicePlan.getMeasurementDataTypeMap();
+    this.measurementDataTypeMap = alignByDevicePlan.getMeasurementDataTypeMap();
     this.queryRouter = queryRouter;
     this.context = context;
     this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
@@ -127,7 +128,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
       List<Path> executePaths = new ArrayList<>();
       List<TSDataType> tsDataTypes = new ArrayList<>();
       List<String> executeAggregations = new ArrayList<>();
-      for (String column : measurementDataTpeMap.keySet()) {
+      for (String column : measurementDataTypeMap.keySet()) {
         String measurement = column;
         if (dataSetType == DataSetType.GROUPBYTIME || dataSetType == DataSetType.AGGREGATE) {
           measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')'));
@@ -138,7 +139,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
         if (measurementOfGivenDevice.contains(measurement)) {
           executeColumns.add(column);
           executePaths.add(new Path(currentDevice, measurement));
-          tsDataTypes.add(measurementDataTpeMap.get(column));
+          tsDataTypes.add(measurementDataTypeMap.get(column));
         }
       }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 7ae24a9..d17457e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -760,7 +760,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of ALIGN_BY_DEVICE result
 
     Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
-    Map<String, TSDataType> measurementDataTypeMap = plan.getMeasurementDataTypeMap();
+    Map<String, TSDataType> measurementDataTypeMap = plan.getColumnDataTypeMap();
 
     // build column header with constant and non exist column and deduplication
     List<String> measurements = plan.getMeasurements();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java
new file mode 100644
index 0000000..3655fe8
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationAlignByDeviceIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBAggregationAlignByDeviceIT {
+
+  private static final String[] dataSet = new String[]{
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(1, 1.0, 1.1, 1.2)",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(2, 2.0, 2.1, 2.2)",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(3, 3.0, 3.1, 3.2)",
+      "flush",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(1, 1.0, 1.1, 1.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(2, 2.0, 2.1, 2.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(3, 3.0, 3.1, 3.2)",
+      "flush",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(1, 11.0, 11.1, 11.2)",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(2, 12.0, 12.1, 12.2)",
+      "INSERT INTO root.sg1.d1(timestamp,s1,s2,s3) values(3, 13.0, 13.1, 13.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(1, 11.0, 11.1, 11.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(2, 12.0, 12.1, 12.2)",
+      "INSERT INTO root.sg1.d2(timestamp,s1,s2,s3) values(3, 13.0, 13.1, 13.2)",
+  };
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void test() throws SQLException {
+    String[] retArray = new String[]{"root.sg1.d1,3,3,3", "root.sg1.d2,3,3,3",};
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement
+          .execute("SELECT count(*) from root align by device");
+
+      Assert.assertTrue(hasResultSet);
+
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString("Device") + "," + resultSet.getString(count("s1"))
+                  + "," + resultSet.getString(count("s2")) + "," + resultSet
+                  .getString(count("s3"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private void prepareData() {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root");
+        Statement statement = connection.createStatement()) {
+
+      for (String sql : dataSet) {
+        statement.execute(sql);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}