You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 13:02:10 UTC

[incubator-iotdb] 01/03: add group by it and add ClusterNullableBatach data to handle null timevalue pair

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 626f2a39c9cb2e71c14d0a0fba9ad8c90d0b8d47
Author: lta <li...@163.com>
AuthorDate: Tue May 21 20:58:57 2019 +0800

    add group by it and add ClusterNullableBatach data to handle null timevalue pair
---
 .../apache/iotdb/cluster/config/ClusterConfig.java |   2 +-
 .../cluster/qp/executor/NonQueryExecutor.java      |   2 -
 ...atchData.java => ClusterNullableBatchData.java} |  42 ++++++---
 .../ClusterGroupByDataSetWithOnlyTimeFilter.java   |  12 ++-
 .../ClusterGroupByDataSetWithTimeGenerator.java    | 101 +++++++++++++++++++++
 .../cluster/query/executor/ClusterQueryRouter.java |   2 +-
 .../querynode/ClusterLocalQueryManager.java        |   7 +-
 .../querynode/ClusterLocalSingleQueryManager.java  |  10 +-
 .../ClusterFillSelectSeriesBatchReader.java        |  11 ++-
 ...lusterGroupBySelectSeriesBatchReaderEntity.java |  10 +-
 .../query/utils/ClusterTimeValuePairUtils.java     |  34 ++++++-
 .../iotdb/cluster/rpc/raft/NodeAsClient.java       |   7 +-
 .../iotdb/cluster/integration/IOTDBGroupByIT.java  |  59 ++++++------
 .../integration/IoTDBAggregationSmallDataIT.java   |  15 ---
 .../groupby/GroupByWithValueFilterDataSet.java     |  10 +-
 .../apache/iotdb/tsfile/read/common/BatchData.java |   4 +-
 16 files changed, 228 insertions(+), 100 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 45df39f..627f561 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -102,7 +102,7 @@ public class ClusterConfig {
    * then it sends requests to other nodes in the cluster. This parameter represents the maximum
    * timeout for these requests. The unit is milliseconds.
    **/
-  private int qpTaskTimeout = 500000;
+  private int qpTaskTimeout = 1000;
 
   /**
    * Number of virtual nodes
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 75fc3a8..ac9bddf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -352,8 +352,6 @@ public class NonQueryExecutor extends AbstractQPExecutor {
     return RaftUtils.executeRaftTaskForLocalProcessor(service, qpTask, response);
   }
 
-
-
   /**
    * Async handle task by QPTask and leader id.
    *
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java
similarity index 54%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java
index 2d17d0c..8315a04 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java
@@ -18,48 +18,62 @@
  */
 package org.apache.iotdb.cluster.query.common;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
 /**
- * <code>FillBatchData</code> is a self-defined data structure which is used in cluster query
- * process of fill type, which only contains one TimeValuePair and value can be null.
+ * <code>ClusterNullableBatchData</code> is a self-defined data structure which is used in cluster
+ * query process of fill type and group by type, which may contain <code>null</code> in list of
+ * TimeValuePair.
  */
-public class FillBatchData extends BatchData {
+public class ClusterNullableBatchData extends BatchData {
 
-  private TimeValuePair timeValuePair;
-  private boolean isUsed;
+  private List<TimeValuePair> timeValuePairList;
+  private int index;
 
-  public FillBatchData(TimeValuePair timeValuePair, boolean isUsed) {
-    this.timeValuePair = timeValuePair;
-    this.isUsed = isUsed;
+  public ClusterNullableBatchData() {
+    this.timeValuePairList = new ArrayList<>();
+    this.index = 0;
   }
 
   @Override
   public boolean hasNext() {
-    return !isUsed;
+    return index < timeValuePairList.size();
   }
 
   @Override
   public void next() {
-    isUsed = true;
+    index++;
   }
 
   @Override
   public long currentTime() {
-    return timeValuePair.getTimestamp();
+    rangeCheckForTime(index);
+    return timeValuePairList.get(index).getTimestamp();
   }
 
   @Override
   public Object currentValue() {
-    if (!isUsed) {
-      return timeValuePair.getValue() == null ? null : timeValuePair.getValue().getValue();
+    if (index < length()) {
+      return timeValuePairList.get(index).getValue() == null ? null
+          : timeValuePairList.get(index).getValue().getValue();
     } else {
       return null;
     }
   }
 
+  @Override
+  public int length() {
+    return timeValuePairList.size();
+  }
+
   public TimeValuePair getTimeValuePair() {
-    return isUsed ? null : timeValuePair;
+    return index < length() ? timeValuePairList.get(index) : null;
+  }
+
+  public void addTimeValuePair(TimeValuePair timeValuePair){
+    timeValuePairList.add(timeValuePair);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
index 98460e4..ef5386d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
@@ -64,7 +64,7 @@ public class ClusterGroupByDataSetWithOnlyTimeFilter extends GroupByWithOnlyTime
       List<Path> paths, long unit, long origin,
       List<Pair<Long, Long>> mergedIntervals, ClusterRpcSingleQueryManager queryManager) {
     super(jobId, paths, unit, origin, mergedIntervals);
-    this.queryManager  =queryManager;
+    this.queryManager = queryManager;
     this.readersOfSelectedSeries = new ArrayList<>();
   }
 
@@ -132,10 +132,14 @@ public class ClusterGroupByDataSetWithOnlyTimeFilter extends GroupByWithOnlyTime
     RowRecord record = new RowRecord(startTime);
     for (int i = 0; i < functions.size(); i++) {
       IPointReader reader = readersOfSelectedSeries.get(i);
-      if(reader != null){
+      if (reader != null) {
         TimeValuePair timeValuePair = reader.next();
-        record.addField(getField(timeValuePair.getValue().getValue(), dataTypes.get(i)));
-      }else {
+        if (timeValuePair == null) {
+          record.addField(new Field(null));
+        } else {
+          record.addField(getField(timeValuePair.getValue().getValue(), dataTypes.get(i)));
+        }
+      } else {
         AggreResultData res;
         try {
           res = nextSeries(i);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
index 00f2d88..89ed1b9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
@@ -30,11 +31,13 @@ import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -88,4 +91,102 @@ public class ClusterGroupByDataSetWithTimeGenerator extends GroupByWithValueFilt
         .createReadersByTimestampOfSelectedPaths(selectedSeries, context, queryManager,
             selectSeriesDataTypes);
   }
+
+  @Override
+  public RowRecord next() throws IOException {
+    if (!hasCachedTimeInterval) {
+      throw new IOException("need to call hasNext() before calling next()"
+          + " in GroupByWithOnlyTimeFilterDataSet.");
+    }
+    hasCachedTimeInterval = false;
+    for (AggregateFunction function : functions) {
+      function.init();
+    }
+
+    long[] timestampArray = new long[timestampFetchSize];
+    int timeArrayLength = 0;
+    if (hasCachedTimestamp) {
+      if (timestamp < endTime) {
+        hasCachedTimestamp = false;
+        timestampArray[timeArrayLength++] = timestamp;
+      } else {
+        return constructRowRecord();
+      }
+    }
+
+    while (timestampGenerator.hasNext()) {
+      // construct timestamp array
+      timeArrayLength = constructTimeArrayForOneCal(timestampArray, timeArrayLength);
+
+      fetchSelectDataFromRemoteNode(timeArrayLength, timestampArray);
+
+      // cal result using timestamp array
+      for (int i = 0; i < selectedSeries.size(); i++) {
+        functions.get(i).calcAggregationUsingTimestamps(
+            timestampArray, timeArrayLength, allDataReaderList.get(i));
+      }
+
+      timeArrayLength = 0;
+      // judge if it's end
+      if (timestamp >= endTime) {
+        hasCachedTimestamp = true;
+        break;
+      }
+    }
+
+    // fetch select series data from remote node
+    fetchSelectDataFromRemoteNode(timeArrayLength, timestampArray);
+
+    if (timeArrayLength > 0) {
+      // cal result using timestamp array
+      for (int i = 0; i < selectedSeries.size(); i++) {
+        functions.get(i).calcAggregationUsingTimestamps(
+            timestampArray, timeArrayLength, allDataReaderList.get(i));
+      }
+    }
+    return constructRowRecord();
+  }
+
+  /**
+   * Get select series batch data by batch timestamp
+   * @param timeArrayLength length of batch timestamp
+   * @param timestampArray timestamp array
+   */
+  private void fetchSelectDataFromRemoteNode(int timeArrayLength, long[] timestampArray)
+      throws IOException {
+    if(timeArrayLength != 0){
+      List<Long> batchTimestamp = new ArrayList<>();
+      for(int i = 0 ; i < timeArrayLength; i++){
+        batchTimestamp.add(timestampArray[i]);
+      }
+
+      try {
+        queryManager.fetchBatchDataByTimestampForAllSelectPaths(batchTimestamp);
+      } catch (
+          RaftConnectionException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  /**
+   * construct an array of timestamps for one batch of a group by partition calculating.
+   *
+   * @param timestampArray timestamp array
+   * @param timeArrayLength the current length of timestamp array
+   * @return time array length
+   */
+  private int constructTimeArrayForOneCal(long[] timestampArray, int timeArrayLength)
+      throws IOException {
+    for (int cnt = 1; cnt < timestampFetchSize && timestampGenerator.hasNext(); cnt++) {
+      timestamp = timestampGenerator.next();
+      if (timestamp < endTime) {
+        timestampArray[timeArrayLength++] = timestamp;
+      } else {
+        hasCachedTimestamp = true;
+        break;
+      }
+    }
+    return timeArrayLength;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index 54e0df5..9f9dc41 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -168,7 +168,7 @@ public class ClusterQueryRouter extends AbstractQueryRouter {
         .optimize(expression, selectedSeries);
     try {
       if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
-//        queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel());
+        queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel());
         ClusterGroupByDataSetWithOnlyTimeFilter groupByEngine = new ClusterGroupByDataSetWithOnlyTimeFilter(
             jobId, selectedSeries, unit, origin, mergedIntervalList, queryManager);
         groupByEngine.initGroupBy(context, aggres, optimizedExpression);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index c83e2a2..4e09af8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -59,12 +59,7 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
     TASK_ID_MAP_JOB_ID.put(taskId, jobId);
     ClusterLocalSingleQueryManager localQueryManager = new ClusterLocalSingleQueryManager(jobId);
     SINGLE_QUERY_MANAGER_MAP.put(jobId, localQueryManager);
-    try {
-      return localQueryManager.createSeriesReader(request);
-    }catch (Exception e){
-      e.printStackTrace();
-      return null;
-    }
+    return localQueryManager.createSeriesReader(request);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index e9c0dcb..25adbf5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -492,11 +492,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
 
     @Override
     public void run() {
-//      try {
-////        close();
-//      } catch (FileNodeManagerException e) {
-//        LOGGER.error(e.getMessage());
-//      }
+      try {
+        close();
+      } catch (FileNodeManagerException e) {
+        LOGGER.error(e.getMessage());
+      }
     }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
index fadd92f..a16a220 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
@@ -19,8 +19,9 @@
 package org.apache.iotdb.cluster.query.reader.querynode;
 
 import java.io.IOException;
-import org.apache.iotdb.cluster.query.common.FillBatchData;
+import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData;
 import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -34,6 +35,12 @@ public class ClusterFillSelectSeriesBatchReader extends ClusterSelectSeriesBatch
 
   @Override
   public BatchData nextBatch() throws IOException {
-    return hasNext() ? new FillBatchData(reader.next(), false) : new FillBatchData(null, true);
+    if(hasNext()){
+      ClusterNullableBatchData batchData = new ClusterNullableBatchData();
+      batchData.addTimeValuePair(reader.next());
+      return batchData;
+    }else{
+      return new ClusterNullableBatchData();
+    }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
index 3b7fabe..5c1d9d1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
@@ -23,7 +23,10 @@ import static org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSerie
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData;
+import org.apache.iotdb.cluster.query.utils.ClusterTimeValuePairUtils;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet;
+import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Field;
@@ -56,7 +59,7 @@ public class ClusterGroupBySelectSeriesBatchReaderEntity implements
   public List<BatchData> nextBatchList() throws IOException {
     List<BatchData> batchDataList = new ArrayList<>(paths.size());
     for (int i = 0; i < paths.size(); i++) {
-      batchDataList.add(new BatchData(dataTypes.get(i), true));
+      batchDataList.add(new ClusterNullableBatchData());
     }
     int dataPointCount = 0;
     while (true) {
@@ -68,10 +71,9 @@ public class ClusterGroupBySelectSeriesBatchReaderEntity implements
       long time = rowRecord.getTimestamp();
       List<Field> fieldList = rowRecord.getFields();
       for (int j = 0; j < paths.size(); j++) {
-        BatchData batchData = batchDataList.get(j);
+        ClusterNullableBatchData batchData = (ClusterNullableBatchData) batchDataList.get(j);
         Object value = fieldList.get(j).getObjectValue(dataTypes.get(j));
-        batchData.putTime(time);
-        batchData.putAnObject(value);
+        batchData.addTimeValuePair(fieldList.get(j).toString().equals("null") ? null : ClusterTimeValuePairUtils.getTimeValuePair(time, value,dataTypes.get(j)));
       }
     }
     return batchDataList;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
index 7525368..d9c8d75 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
@@ -18,10 +18,14 @@
  */
 package org.apache.iotdb.cluster.query.utils;
 
-import org.apache.iotdb.cluster.query.common.FillBatchData;
+import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.db.utils.TimeValuePairUtils;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.Binary;
 
 public class ClusterTimeValuePairUtils {
 
@@ -35,10 +39,32 @@ public class ClusterTimeValuePairUtils {
    * @return -given data's (time,value) pair
    */
   public static TimeValuePair getCurrentTimeValuePair(BatchData data) {
-    if (data instanceof FillBatchData){
-      return ((FillBatchData)data).getTimeValuePair();
-    }else{
+    if (data instanceof ClusterNullableBatchData) {
+      return ((ClusterNullableBatchData) data).getTimeValuePair();
+    } else {
       return TimeValuePairUtils.getCurrentTimeValuePair(data);
     }
   }
+
+  /**
+   * Get (time,value) pair according to data type
+   */
+  public static TimeValuePair getTimeValuePair(long time, Object v, TSDataType dataType) {
+    switch (dataType) {
+      case INT32:
+        return new TimeValuePair(time, new TsPrimitiveType.TsInt((int) v));
+      case INT64:
+        return new TimeValuePair(time, new TsPrimitiveType.TsLong((long) v));
+      case FLOAT:
+        return new TimeValuePair(time, new TsPrimitiveType.TsFloat((float) v));
+      case DOUBLE:
+        return new TimeValuePair(time, new TsPrimitiveType.TsDouble((double) v));
+      case TEXT:
+        return new TimeValuePair(time, new TsPrimitiveType.TsBinary((Binary) v));
+      case BOOLEAN:
+        return new TimeValuePair(time, new TsPrimitiveType.TsBoolean((boolean) v));
+      default:
+        throw new UnSupportedDataTypeException(String.valueOf(v));
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
index 197c7eb..b4a2f25 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.cluster.rpc.raft;
 
 import com.alipay.sofa.jraft.entity.PeerId;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.task.QueryTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
-import org.apache.iotdb.cluster.qp.task.QueryTask;
 
 /**
  * Handle the request and process the result as a client with the current node
@@ -31,7 +31,8 @@ public interface NodeAsClient {
 
   /**
    * Asynchronous processing requests
-   *  @param leader leader node of the target group
+   *
+   * @param leader leader node of the target group
    * @param qpTask single QPTask to be executed
    */
   void asyncHandleRequest(BasicRequest request, PeerId leader,
@@ -39,8 +40,8 @@ public interface NodeAsClient {
 
   /**
    * Synchronous processing requests
-   * @param peerId leader node of the target group
    *
+   * @param peerId leader node of the target group
    */
   QueryTask syncHandleRequest(BasicRequest request, PeerId peerId);
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
index 265d509..0165bba 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
@@ -120,7 +120,6 @@ public class IOTDBGroupByIT {
     EnvironmentUtils.closeMemControl();
     CLUSTER_CONFIG.createAllPath();
     server = Server.getInstance();
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
     server.start();
     EnvironmentUtils.envSetUp();
     Class.forName(Config.JDBC_DRIVER_NAME);
@@ -135,7 +134,7 @@ public class IOTDBGroupByIT {
   }
 
   @Test
-  public void countSumMeanTest() throws SQLException {
+  public void countSumMeanTest() {
     String[] retArray1 = new String[]{
         "2,1,4.4,4.4",
         "5,3,35.8,11.933333333333332",
@@ -162,30 +161,10 @@ public class IOTDBGroupByIT {
     };
     try (Connection connection = DriverManager.
         getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
-//      Statement statement = connection.createStatement();
-//      boolean hasResultSet = statement.execute(
-//          "select count(temperature), sum(temperature), mean(temperature) from "
-//              + "root.ln.wf01.wt01 where time > 3 "
-//              + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
-//
-//      Assert.assertTrue(hasResultSet);
-//      ResultSet resultSet = statement.getResultSet();
-//      int cnt = 0;
-//      while (resultSet.next()) {
-//        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
-//            .getString(count("root.ln.wf01.wt01.temperature")) + "," +
-//            resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
-//            .getString(mean("root.ln.wf01.wt01.temperature"));
-//        Assert.assertEquals(retArray1[cnt], ans);
-//        cnt++;
-//      }
-//      Assert.assertEquals(retArray1.length, cnt);
-//      statement.close();
-
       Statement statement = connection.createStatement();
       boolean hasResultSet = statement.execute(
           "select count(temperature), sum(temperature), mean(temperature) from "
-              + "root.ln.wf01.wt01 where temperature > 3 "
+              + "root.ln.wf01.wt01 where time > 3 "
               + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
 
       Assert.assertTrue(hasResultSet);
@@ -196,6 +175,26 @@ public class IOTDBGroupByIT {
             .getString(count("root.ln.wf01.wt01.temperature")) + "," +
             resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
             .getString(mean("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray1.length, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute(
+          "select count(temperature), sum(temperature), mean(temperature) from "
+              + "root.ln.wf01.wt01 where temperature > 3 "
+              + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+            .getString(count("root.ln.wf01.wt01.temperature")) + "," +
+            resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
+            .getString(mean("root.ln.wf01.wt01.temperature"));
         Assert.assertEquals(retArray2[cnt], ans);
         cnt++;
       }
@@ -209,7 +208,7 @@ public class IOTDBGroupByIT {
   }
 
   @Test
-  public void maxMinValeTimeTest() throws SQLException {
+  public void maxMinValeTimeTest() {
     String[] retArray1 = new String[]{
         "2,4.4,4.4,4,4",
         "5,20.2,5.5,20,5",
@@ -274,7 +273,6 @@ public class IOTDBGroupByIT {
             + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
         Assert.assertEquals(retArray2[cnt], ans);
         cnt++;
-        //System.out.println(ans);
       }
       Assert.assertEquals(retArray2.length, cnt);
       statement.close();
@@ -286,7 +284,7 @@ public class IOTDBGroupByIT {
   }
 
   @Test
-  public void firstLastTest() throws SQLException {
+  public void firstLastTest() {
     String[] retArray1 = new String[]{
         "2,4.4,4.4",
         "5,20.2,5.5",
@@ -325,7 +323,6 @@ public class IOTDBGroupByIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
             .getString(last("root.ln.wf01.wt01.temperature"))
             + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature"));
-        System.out.println(ans);
         Assert.assertEquals(retArray1[cnt], ans);
         cnt++;
       }
@@ -345,7 +342,6 @@ public class IOTDBGroupByIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
             .getString(last("root.ln.wf01.wt01.temperature"))
             + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature"));
-        System.out.println(ans);
         Assert.assertEquals(retArray2[cnt], ans);
         cnt++;
       }
@@ -359,7 +355,7 @@ public class IOTDBGroupByIT {
   }
 
   @Test
-  public void largeIntervalTest() throws SQLException {
+  public void largeIntervalTest() {
     String[] retArray1 = new String[]{
         "2,4.4,4,20,4",
         "30,30.3,16,610,30",
@@ -410,7 +406,6 @@ public class IOTDBGroupByIT {
             + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
         Assert.assertEquals(retArray2[cnt], ans);
         cnt++;
-        //System.out.println(ans);
       }
       Assert.assertEquals(retArray2.length, cnt);
       statement.close();
@@ -422,7 +417,7 @@ public class IOTDBGroupByIT {
   }
 
   @Test
-  public void smallPartitionTest() throws SQLException {
+  public void smallPartitionTest() {
     String[] retArray1 = new String[]{
         "50,100.1,50.5,150.6",
         "615,500.5,500.5,500.5"
@@ -449,7 +444,6 @@ public class IOTDBGroupByIT {
             .getString(last("root.ln.wf01.wt01.temperature"))
             + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")) + ","
             + resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
-        System.out.println(ans);
         Assert.assertEquals(retArray1[cnt], ans);
         cnt++;
       }
@@ -470,7 +464,6 @@ public class IOTDBGroupByIT {
             .getString(last("root.ln.wf01.wt01.temperature"))
             + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")) + ","
             + resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
-        System.out.println(ans);
         Assert.assertEquals(retArray2[cnt], ans);
         cnt++;
       }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
index 77afbea..056a70f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
@@ -294,7 +294,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans =
             resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0)) + ","
                 + resultSet.getString(last(d0s1));
-        //System.out.println("!!!!!============ " + ans);
         Assert.assertEquals(retArray[cnt], ans);
         cnt++;
       }
@@ -323,7 +322,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans =
             resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(first(d0s0)) + ","
                 + resultSet.getString(first(d0s1));
-        //System.out.println("!!!!!============ " + ans);
         Assert.assertEquals(retArray[cnt], ans);
         cnt++;
       }
@@ -352,7 +350,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(sum(d0s0))
             + "," + resultSet.getString(sum(d0s1)) + "," + Math
             .round(resultSet.getDouble(sum(d0s2)));
-        //System.out.println("!!!!!============ " + ans);
         Assert.assertEquals(retArray[cnt], ans);
         cnt++;
       }
@@ -381,7 +378,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(mean(d0s0))
             + "," + Math.round(resultSet.getDouble(mean(d0s1))) + ","
             + Math.round(resultSet.getDouble(mean(d0s2)));
-        //System.out.println("!!!!!============ " + ans);
         Assert.assertEquals(retArray[cnt], ans);
         cnt++;
       }
@@ -404,7 +400,6 @@ public class IoTDBAggregationSmallDataIT {
       Statement statement = connection.createStatement();
       boolean hasResultSet = statement.execute("select count(s0),count(s1),count(s2),count(s3),"
           + "count(s4) from root.vehicle.d0 where s2 >= 3.33");
-      // System.out.println(hasResultSet + "...");
       Assert.assertTrue(hasResultSet);
       ResultSet resultSet = statement.getResultSet();
       int cnt = 0;
@@ -412,7 +407,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0))
             + "," + resultSet.getString(count(d0s1)) + "," + resultSet.getString(count(d0s2))
             + "," + resultSet.getString(count(d0s3)) + "," + resultSet.getString(count(d0s4));
-        // System.out.println("============ " + ans);
         Assert.assertEquals(retArray[cnt], ans);
         cnt++;
       }
@@ -444,7 +438,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(min_time(d0s0))
             + "," + resultSet.getString(min_time(d0s1)) + "," + resultSet.getString(min_time(d0s2))
             + "," + resultSet.getString(min_time(d0s3)) + "," + resultSet.getString(min_time(d0s4));
-        // System.out.println("============ " + ans);
         Assert.assertEquals(ans, retArray[cnt]);
         cnt++;
         Assert.assertEquals(1, cnt);
@@ -476,7 +469,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_time(d0s0))
             + "," + resultSet.getString(max_time(d0s1)) + "," + resultSet.getString(max_time(d0s2))
             + "," + resultSet.getString(max_time(d0s3)) + "," + resultSet.getString(max_time(d0s4));
-        // System.out.println("============ " + ans);
         Assert.assertEquals(ans, retArray[cnt]);
         cnt++;
       }
@@ -510,7 +502,6 @@ public class IoTDBAggregationSmallDataIT {
             "," + resultSet.getString(min_value(d0s2))
             + "," + resultSet.getString(min_value(d0s3)) + ","
             + resultSet.getString(min_value(d0s4));
-        // System.out.println("============ " + ans);
         Assert.assertEquals(ans, retArray[cnt]);
         cnt++;
       }
@@ -545,8 +536,6 @@ public class IoTDBAggregationSmallDataIT {
             .getString(max_value(d0s2))
             + "," + resultSet.getString(max_value(d0s3)) + "," + resultSet
             .getString(max_value(d0s4));
-        //System.out.println("============ " + ans);
-        //Assert.assertEquals(ans, retArray[cnt]);
         cnt++;
       }
       Assert.assertEquals(1, cnt);
@@ -568,13 +557,11 @@ public class IoTDBAggregationSmallDataIT {
       Statement statement = connection.createStatement();
       boolean hasResultSet = statement.execute(
           "select count(s0) from root.vehicle.d0 where s2 >= 3.33");
-      // System.out.println(hasResultSet + "...");
       Assert.assertTrue(hasResultSet);
       ResultSet resultSet = statement.getResultSet();
       int cnt = 0;
       while (resultSet.next()) {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0));
-        //System.out.println("============ " + ans);
         Assert.assertEquals(ans, retArray[cnt]);
         cnt++;
       }
@@ -613,7 +600,6 @@ public class IoTDBAggregationSmallDataIT {
         (Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")) {
       Statement statement = connection.createStatement();
       boolean hasResultSet = statement.execute("select * from root");
-      // System.out.println(hasResultSet + "...");
       if (hasResultSet) {
         ResultSet resultSet = statement.getResultSet();
         int cnt = 0;
@@ -621,7 +607,6 @@ public class IoTDBAggregationSmallDataIT {
           String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(d0s0) + ","
               + resultSet.getString(d0s1) + "," + resultSet.getString(d0s2) + "," +
               resultSet.getString(d0s3) + "," + resultSet.getString(d1s0);
-          // System.out.println(ans);
           Assert.assertEquals(ans, retArray[cnt]);
           cnt++;
         }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 528b378..002c28f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -42,19 +42,21 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
 
   protected List<EngineReaderByTimeStamp> allDataReaderList;
   protected TimeGenerator timestampGenerator;
+
   /**
    * cached timestamp for next group by partition.
    */
-  private long timestamp;
+  protected long timestamp;
+
   /**
    * if this object has cached timestamp for next group by partition.
    */
-  private boolean hasCachedTimestamp;
+  protected boolean hasCachedTimestamp;
 
   /**
    * group by batch calculation size.
    */
-  private int timestampFetchSize;
+  protected int timestampFetchSize;
 
   /**
    * constructor.
@@ -152,7 +154,7 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
     return timeArrayLength;
   }
 
-  private RowRecord constructRowRecord() {
+  protected RowRecord constructRowRecord() {
     RowRecord record = new RowRecord(startTime);
     functions.forEach(function -> record.addField(getField(function.getResult())));
     return record;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index aeb789e..de52849 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -378,11 +378,11 @@ public class BatchData implements Serializable {
   /**
    * Checks if the given index is in range. If not, throws an appropriate runtime exception.
    */
-  private void rangeCheckForTime(int idx) {
+  protected void rangeCheckForTime(int idx) {
     if (idx < 0) {
       throw new IndexOutOfBoundsException("BatchData time range check, Index is negative: " + idx);
     }
-    if (idx >= timeLength) {
+    if (idx >= length()) {
       throw new IndexOutOfBoundsException(
           "BatchData time range check, Index : " + idx + ". Length : " + timeLength);
     }