You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 13:02:09 UTC

[incubator-iotdb] branch cluster updated (39c8865 -> 61e8452)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 39c8865  fix non-query bug: fail to execute when leader down
     new 626f2a3  add group by it and add ClusterNullableBatach data to handle null timevalue pair
     new 2a30289  Merge branch 'cluster' of github.com:apache/incubator-iotdb into cluster
     new 61e8452  remove useless code

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/iotdb/cluster/config/ClusterConfig.java |   2 +-
 .../cluster/qp/executor/NonQueryExecutor.java      |   3 -
 ...atchData.java => ClusterNullableBatchData.java} |  42 ++++++---
 .../ClusterGroupByDataSetWithOnlyTimeFilter.java   |  12 ++-
 .../ClusterGroupByDataSetWithTimeGenerator.java    | 101 +++++++++++++++++++++
 .../cluster/query/executor/ClusterQueryRouter.java |   2 +-
 .../querynode/ClusterLocalQueryManager.java        |   7 +-
 .../querynode/ClusterLocalSingleQueryManager.java  |  10 +-
 .../ClusterFillSelectSeriesBatchReader.java        |  11 ++-
 ...lusterGroupBySelectSeriesBatchReaderEntity.java |  10 +-
 .../query/utils/ClusterTimeValuePairUtils.java     |  34 ++++++-
 .../iotdb/cluster/rpc/raft/NodeAsClient.java       |   7 +-
 .../iotdb/cluster/integration/IOTDBGroupByIT.java  |  59 ++++++------
 .../integration/IoTDBAggregationSmallDataIT.java   |  15 ---
 .../groupby/GroupByWithValueFilterDataSet.java     |  10 +-
 .../apache/iotdb/tsfile/read/common/BatchData.java |   4 +-
 16 files changed, 228 insertions(+), 101 deletions(-)
 rename cluster/src/main/java/org/apache/iotdb/cluster/query/common/{FillBatchData.java => ClusterNullableBatchData.java} (54%)


[incubator-iotdb] 01/03: add group by it and add ClusterNullableBatach data to handle null timevalue pair

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 626f2a39c9cb2e71c14d0a0fba9ad8c90d0b8d47
Author: lta <li...@163.com>
AuthorDate: Tue May 21 20:58:57 2019 +0800

    add group by it and add ClusterNullableBatach data to handle null timevalue pair
---
 .../apache/iotdb/cluster/config/ClusterConfig.java |   2 +-
 .../cluster/qp/executor/NonQueryExecutor.java      |   2 -
 ...atchData.java => ClusterNullableBatchData.java} |  42 ++++++---
 .../ClusterGroupByDataSetWithOnlyTimeFilter.java   |  12 ++-
 .../ClusterGroupByDataSetWithTimeGenerator.java    | 101 +++++++++++++++++++++
 .../cluster/query/executor/ClusterQueryRouter.java |   2 +-
 .../querynode/ClusterLocalQueryManager.java        |   7 +-
 .../querynode/ClusterLocalSingleQueryManager.java  |  10 +-
 .../ClusterFillSelectSeriesBatchReader.java        |  11 ++-
 ...lusterGroupBySelectSeriesBatchReaderEntity.java |  10 +-
 .../query/utils/ClusterTimeValuePairUtils.java     |  34 ++++++-
 .../iotdb/cluster/rpc/raft/NodeAsClient.java       |   7 +-
 .../iotdb/cluster/integration/IOTDBGroupByIT.java  |  59 ++++++------
 .../integration/IoTDBAggregationSmallDataIT.java   |  15 ---
 .../groupby/GroupByWithValueFilterDataSet.java     |  10 +-
 .../apache/iotdb/tsfile/read/common/BatchData.java |   4 +-
 16 files changed, 228 insertions(+), 100 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 45df39f..627f561 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -102,7 +102,7 @@ public class ClusterConfig {
    * then it sends requests to other nodes in the cluster. This parameter represents the maximum
    * timeout for these requests. The unit is milliseconds.
    **/
-  private int qpTaskTimeout = 500000;
+  private int qpTaskTimeout = 1000;
 
   /**
    * Number of virtual nodes
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 75fc3a8..ac9bddf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -352,8 +352,6 @@ public class NonQueryExecutor extends AbstractQPExecutor {
     return RaftUtils.executeRaftTaskForLocalProcessor(service, qpTask, response);
   }
 
-
-
   /**
    * Async handle task by QPTask and leader id.
    *
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java
similarity index 54%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java
index 2d17d0c..8315a04 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java
@@ -18,48 +18,62 @@
  */
 package org.apache.iotdb.cluster.query.common;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
 /**
- * <code>FillBatchData</code> is a self-defined data structure which is used in cluster query
- * process of fill type, which only contains one TimeValuePair and value can be null.
+ * <code>ClusterNullableBatchData</code> is a self-defined data structure which is used in cluster
+ * query process of fill type and group by type, which may contain <code>null</code> in list of
+ * TimeValuePair.
  */
-public class FillBatchData extends BatchData {
+public class ClusterNullableBatchData extends BatchData {
 
-  private TimeValuePair timeValuePair;
-  private boolean isUsed;
+  private List<TimeValuePair> timeValuePairList;
+  private int index;
 
-  public FillBatchData(TimeValuePair timeValuePair, boolean isUsed) {
-    this.timeValuePair = timeValuePair;
-    this.isUsed = isUsed;
+  public ClusterNullableBatchData() {
+    this.timeValuePairList = new ArrayList<>();
+    this.index = 0;
   }
 
   @Override
   public boolean hasNext() {
-    return !isUsed;
+    return index < timeValuePairList.size();
   }
 
   @Override
   public void next() {
-    isUsed = true;
+    index++;
   }
 
   @Override
   public long currentTime() {
-    return timeValuePair.getTimestamp();
+    rangeCheckForTime(index);
+    return timeValuePairList.get(index).getTimestamp();
   }
 
   @Override
   public Object currentValue() {
-    if (!isUsed) {
-      return timeValuePair.getValue() == null ? null : timeValuePair.getValue().getValue();
+    if (index < length()) {
+      return timeValuePairList.get(index).getValue() == null ? null
+          : timeValuePairList.get(index).getValue().getValue();
     } else {
       return null;
     }
   }
 
+  @Override
+  public int length() {
+    return timeValuePairList.size();
+  }
+
   public TimeValuePair getTimeValuePair() {
-    return isUsed ? null : timeValuePair;
+    return index < length() ? timeValuePairList.get(index) : null;
+  }
+
+  public void addTimeValuePair(TimeValuePair timeValuePair){
+    timeValuePairList.add(timeValuePair);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
index 98460e4..ef5386d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
@@ -64,7 +64,7 @@ public class ClusterGroupByDataSetWithOnlyTimeFilter extends GroupByWithOnlyTime
       List<Path> paths, long unit, long origin,
       List<Pair<Long, Long>> mergedIntervals, ClusterRpcSingleQueryManager queryManager) {
     super(jobId, paths, unit, origin, mergedIntervals);
-    this.queryManager  =queryManager;
+    this.queryManager = queryManager;
     this.readersOfSelectedSeries = new ArrayList<>();
   }
 
@@ -132,10 +132,14 @@ public class ClusterGroupByDataSetWithOnlyTimeFilter extends GroupByWithOnlyTime
     RowRecord record = new RowRecord(startTime);
     for (int i = 0; i < functions.size(); i++) {
       IPointReader reader = readersOfSelectedSeries.get(i);
-      if(reader != null){
+      if (reader != null) {
         TimeValuePair timeValuePair = reader.next();
-        record.addField(getField(timeValuePair.getValue().getValue(), dataTypes.get(i)));
-      }else {
+        if (timeValuePair == null) {
+          record.addField(new Field(null));
+        } else {
+          record.addField(getField(timeValuePair.getValue().getValue(), dataTypes.get(i)));
+        }
+      } else {
         AggreResultData res;
         try {
           res = nextSeries(i);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
index 00f2d88..89ed1b9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
 import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
@@ -30,11 +31,13 @@ import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -88,4 +91,102 @@ public class ClusterGroupByDataSetWithTimeGenerator extends GroupByWithValueFilt
         .createReadersByTimestampOfSelectedPaths(selectedSeries, context, queryManager,
             selectSeriesDataTypes);
   }
+
+  @Override
+  public RowRecord next() throws IOException {
+    if (!hasCachedTimeInterval) {
+      throw new IOException("need to call hasNext() before calling next()"
+          + " in GroupByWithOnlyTimeFilterDataSet.");
+    }
+    hasCachedTimeInterval = false;
+    for (AggregateFunction function : functions) {
+      function.init();
+    }
+
+    long[] timestampArray = new long[timestampFetchSize];
+    int timeArrayLength = 0;
+    if (hasCachedTimestamp) {
+      if (timestamp < endTime) {
+        hasCachedTimestamp = false;
+        timestampArray[timeArrayLength++] = timestamp;
+      } else {
+        return constructRowRecord();
+      }
+    }
+
+    while (timestampGenerator.hasNext()) {
+      // construct timestamp array
+      timeArrayLength = constructTimeArrayForOneCal(timestampArray, timeArrayLength);
+
+      fetchSelectDataFromRemoteNode(timeArrayLength, timestampArray);
+
+      // cal result using timestamp array
+      for (int i = 0; i < selectedSeries.size(); i++) {
+        functions.get(i).calcAggregationUsingTimestamps(
+            timestampArray, timeArrayLength, allDataReaderList.get(i));
+      }
+
+      timeArrayLength = 0;
+      // judge if it's end
+      if (timestamp >= endTime) {
+        hasCachedTimestamp = true;
+        break;
+      }
+    }
+
+    // fetch select series data from remote node
+    fetchSelectDataFromRemoteNode(timeArrayLength, timestampArray);
+
+    if (timeArrayLength > 0) {
+      // cal result using timestamp array
+      for (int i = 0; i < selectedSeries.size(); i++) {
+        functions.get(i).calcAggregationUsingTimestamps(
+            timestampArray, timeArrayLength, allDataReaderList.get(i));
+      }
+    }
+    return constructRowRecord();
+  }
+
+  /**
+   * Get select series batch data by batch timestamp
+   * @param timeArrayLength length of batch timestamp
+   * @param timestampArray timestamp array
+   */
+  private void fetchSelectDataFromRemoteNode(int timeArrayLength, long[] timestampArray)
+      throws IOException {
+    if(timeArrayLength != 0){
+      List<Long> batchTimestamp = new ArrayList<>();
+      for(int i = 0 ; i < timeArrayLength; i++){
+        batchTimestamp.add(timestampArray[i]);
+      }
+
+      try {
+        queryManager.fetchBatchDataByTimestampForAllSelectPaths(batchTimestamp);
+      } catch (
+          RaftConnectionException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  /**
+   * construct an array of timestamps for one batch of a group by partition calculating.
+   *
+   * @param timestampArray timestamp array
+   * @param timeArrayLength the current length of timestamp array
+   * @return time array length
+   */
+  private int constructTimeArrayForOneCal(long[] timestampArray, int timeArrayLength)
+      throws IOException {
+    for (int cnt = 1; cnt < timestampFetchSize && timestampGenerator.hasNext(); cnt++) {
+      timestamp = timestampGenerator.next();
+      if (timestamp < endTime) {
+        timestampArray[timeArrayLength++] = timestamp;
+      } else {
+        hasCachedTimestamp = true;
+        break;
+      }
+    }
+    return timeArrayLength;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index 54e0df5..9f9dc41 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -168,7 +168,7 @@ public class ClusterQueryRouter extends AbstractQueryRouter {
         .optimize(expression, selectedSeries);
     try {
       if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
-//        queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel());
+        queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel());
         ClusterGroupByDataSetWithOnlyTimeFilter groupByEngine = new ClusterGroupByDataSetWithOnlyTimeFilter(
             jobId, selectedSeries, unit, origin, mergedIntervalList, queryManager);
         groupByEngine.initGroupBy(context, aggres, optimizedExpression);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index c83e2a2..4e09af8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -59,12 +59,7 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
     TASK_ID_MAP_JOB_ID.put(taskId, jobId);
     ClusterLocalSingleQueryManager localQueryManager = new ClusterLocalSingleQueryManager(jobId);
     SINGLE_QUERY_MANAGER_MAP.put(jobId, localQueryManager);
-    try {
-      return localQueryManager.createSeriesReader(request);
-    }catch (Exception e){
-      e.printStackTrace();
-      return null;
-    }
+    return localQueryManager.createSeriesReader(request);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index e9c0dcb..25adbf5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -492,11 +492,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
 
     @Override
     public void run() {
-//      try {
-////        close();
-//      } catch (FileNodeManagerException e) {
-//        LOGGER.error(e.getMessage());
-//      }
+      try {
+        close();
+      } catch (FileNodeManagerException e) {
+        LOGGER.error(e.getMessage());
+      }
     }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
index fadd92f..a16a220 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
@@ -19,8 +19,9 @@
 package org.apache.iotdb.cluster.query.reader.querynode;
 
 import java.io.IOException;
-import org.apache.iotdb.cluster.query.common.FillBatchData;
+import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData;
 import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -34,6 +35,12 @@ public class ClusterFillSelectSeriesBatchReader extends ClusterSelectSeriesBatch
 
   @Override
   public BatchData nextBatch() throws IOException {
-    return hasNext() ? new FillBatchData(reader.next(), false) : new FillBatchData(null, true);
+    if(hasNext()){
+      ClusterNullableBatchData batchData = new ClusterNullableBatchData();
+      batchData.addTimeValuePair(reader.next());
+      return batchData;
+    }else{
+      return new ClusterNullableBatchData();
+    }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
index 3b7fabe..5c1d9d1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
@@ -23,7 +23,10 @@ import static org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSerie
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData;
+import org.apache.iotdb.cluster.query.utils.ClusterTimeValuePairUtils;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet;
+import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Field;
@@ -56,7 +59,7 @@ public class ClusterGroupBySelectSeriesBatchReaderEntity implements
   public List<BatchData> nextBatchList() throws IOException {
     List<BatchData> batchDataList = new ArrayList<>(paths.size());
     for (int i = 0; i < paths.size(); i++) {
-      batchDataList.add(new BatchData(dataTypes.get(i), true));
+      batchDataList.add(new ClusterNullableBatchData());
     }
     int dataPointCount = 0;
     while (true) {
@@ -68,10 +71,9 @@ public class ClusterGroupBySelectSeriesBatchReaderEntity implements
       long time = rowRecord.getTimestamp();
       List<Field> fieldList = rowRecord.getFields();
       for (int j = 0; j < paths.size(); j++) {
-        BatchData batchData = batchDataList.get(j);
+        ClusterNullableBatchData batchData = (ClusterNullableBatchData) batchDataList.get(j);
         Object value = fieldList.get(j).getObjectValue(dataTypes.get(j));
-        batchData.putTime(time);
-        batchData.putAnObject(value);
+        batchData.addTimeValuePair(fieldList.get(j).toString().equals("null") ? null : ClusterTimeValuePairUtils.getTimeValuePair(time, value,dataTypes.get(j)));
       }
     }
     return batchDataList;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
index 7525368..d9c8d75 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
@@ -18,10 +18,14 @@
  */
 package org.apache.iotdb.cluster.query.utils;
 
-import org.apache.iotdb.cluster.query.common.FillBatchData;
+import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.db.utils.TimeValuePairUtils;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.Binary;
 
 public class ClusterTimeValuePairUtils {
 
@@ -35,10 +39,32 @@ public class ClusterTimeValuePairUtils {
    * @return -given data's (time,value) pair
    */
   public static TimeValuePair getCurrentTimeValuePair(BatchData data) {
-    if (data instanceof FillBatchData){
-      return ((FillBatchData)data).getTimeValuePair();
-    }else{
+    if (data instanceof ClusterNullableBatchData) {
+      return ((ClusterNullableBatchData) data).getTimeValuePair();
+    } else {
       return TimeValuePairUtils.getCurrentTimeValuePair(data);
     }
   }
+
+  /**
+   * Get (time,value) pair according to data type
+   */
+  public static TimeValuePair getTimeValuePair(long time, Object v, TSDataType dataType) {
+    switch (dataType) {
+      case INT32:
+        return new TimeValuePair(time, new TsPrimitiveType.TsInt((int) v));
+      case INT64:
+        return new TimeValuePair(time, new TsPrimitiveType.TsLong((long) v));
+      case FLOAT:
+        return new TimeValuePair(time, new TsPrimitiveType.TsFloat((float) v));
+      case DOUBLE:
+        return new TimeValuePair(time, new TsPrimitiveType.TsDouble((double) v));
+      case TEXT:
+        return new TimeValuePair(time, new TsPrimitiveType.TsBinary((Binary) v));
+      case BOOLEAN:
+        return new TimeValuePair(time, new TsPrimitiveType.TsBoolean((boolean) v));
+      default:
+        throw new UnSupportedDataTypeException(String.valueOf(v));
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
index 197c7eb..b4a2f25 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.cluster.rpc.raft;
 
 import com.alipay.sofa.jraft.entity.PeerId;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.task.QueryTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
-import org.apache.iotdb.cluster.qp.task.QueryTask;
 
 /**
  * Handle the request and process the result as a client with the current node
@@ -31,7 +31,8 @@ public interface NodeAsClient {
 
   /**
    * Asynchronous processing requests
-   *  @param leader leader node of the target group
+   *
+   * @param leader leader node of the target group
    * @param qpTask single QPTask to be executed
    */
   void asyncHandleRequest(BasicRequest request, PeerId leader,
@@ -39,8 +40,8 @@ public interface NodeAsClient {
 
   /**
    * Synchronous processing requests
-   * @param peerId leader node of the target group
    *
+   * @param peerId leader node of the target group
    */
   QueryTask syncHandleRequest(BasicRequest request, PeerId peerId);
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
index 265d509..0165bba 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
@@ -120,7 +120,6 @@ public class IOTDBGroupByIT {
     EnvironmentUtils.closeMemControl();
     CLUSTER_CONFIG.createAllPath();
     server = Server.getInstance();
-    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
     server.start();
     EnvironmentUtils.envSetUp();
     Class.forName(Config.JDBC_DRIVER_NAME);
@@ -135,7 +134,7 @@ public class IOTDBGroupByIT {
   }
 
   @Test
-  public void countSumMeanTest() throws SQLException {
+  public void countSumMeanTest() {
     String[] retArray1 = new String[]{
         "2,1,4.4,4.4",
         "5,3,35.8,11.933333333333332",
@@ -162,30 +161,10 @@ public class IOTDBGroupByIT {
     };
     try (Connection connection = DriverManager.
         getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
-//      Statement statement = connection.createStatement();
-//      boolean hasResultSet = statement.execute(
-//          "select count(temperature), sum(temperature), mean(temperature) from "
-//              + "root.ln.wf01.wt01 where time > 3 "
-//              + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
-//
-//      Assert.assertTrue(hasResultSet);
-//      ResultSet resultSet = statement.getResultSet();
-//      int cnt = 0;
-//      while (resultSet.next()) {
-//        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
-//            .getString(count("root.ln.wf01.wt01.temperature")) + "," +
-//            resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
-//            .getString(mean("root.ln.wf01.wt01.temperature"));
-//        Assert.assertEquals(retArray1[cnt], ans);
-//        cnt++;
-//      }
-//      Assert.assertEquals(retArray1.length, cnt);
-//      statement.close();
-
       Statement statement = connection.createStatement();
       boolean hasResultSet = statement.execute(
           "select count(temperature), sum(temperature), mean(temperature) from "
-              + "root.ln.wf01.wt01 where temperature > 3 "
+              + "root.ln.wf01.wt01 where time > 3 "
               + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
 
       Assert.assertTrue(hasResultSet);
@@ -196,6 +175,26 @@ public class IOTDBGroupByIT {
             .getString(count("root.ln.wf01.wt01.temperature")) + "," +
             resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
             .getString(mean("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray1.length, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute(
+          "select count(temperature), sum(temperature), mean(temperature) from "
+              + "root.ln.wf01.wt01 where temperature > 3 "
+              + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+            .getString(count("root.ln.wf01.wt01.temperature")) + "," +
+            resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
+            .getString(mean("root.ln.wf01.wt01.temperature"));
         Assert.assertEquals(retArray2[cnt], ans);
         cnt++;
       }
@@ -209,7 +208,7 @@ public class IOTDBGroupByIT {
   }
 
   @Test
-  public void maxMinValeTimeTest() throws SQLException {
+  public void maxMinValeTimeTest() {
     String[] retArray1 = new String[]{
         "2,4.4,4.4,4,4",
         "5,20.2,5.5,20,5",
@@ -274,7 +273,6 @@ public class IOTDBGroupByIT {
             + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
         Assert.assertEquals(retArray2[cnt], ans);
         cnt++;
-        //System.out.println(ans);
       }
       Assert.assertEquals(retArray2.length, cnt);
       statement.close();
@@ -286,7 +284,7 @@ public class IOTDBGroupByIT {
   }
 
   @Test
-  public void firstLastTest() throws SQLException {
+  public void firstLastTest() {
     String[] retArray1 = new String[]{
         "2,4.4,4.4",
         "5,20.2,5.5",
@@ -325,7 +323,6 @@ public class IOTDBGroupByIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
             .getString(last("root.ln.wf01.wt01.temperature"))
             + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature"));
-        System.out.println(ans);
         Assert.assertEquals(retArray1[cnt], ans);
         cnt++;
       }
@@ -345,7 +342,6 @@ public class IOTDBGroupByIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
             .getString(last("root.ln.wf01.wt01.temperature"))
             + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature"));
-        System.out.println(ans);
         Assert.assertEquals(retArray2[cnt], ans);
         cnt++;
       }
@@ -359,7 +355,7 @@ public class IOTDBGroupByIT {
   }
 
   @Test
-  public void largeIntervalTest() throws SQLException {
+  public void largeIntervalTest() {
     String[] retArray1 = new String[]{
         "2,4.4,4,20,4",
         "30,30.3,16,610,30",
@@ -410,7 +406,6 @@ public class IOTDBGroupByIT {
             + "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
         Assert.assertEquals(retArray2[cnt], ans);
         cnt++;
-        //System.out.println(ans);
       }
       Assert.assertEquals(retArray2.length, cnt);
       statement.close();
@@ -422,7 +417,7 @@ public class IOTDBGroupByIT {
   }
 
   @Test
-  public void smallPartitionTest() throws SQLException {
+  public void smallPartitionTest() {
     String[] retArray1 = new String[]{
         "50,100.1,50.5,150.6",
         "615,500.5,500.5,500.5"
@@ -449,7 +444,6 @@ public class IOTDBGroupByIT {
             .getString(last("root.ln.wf01.wt01.temperature"))
             + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")) + ","
             + resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
-        System.out.println(ans);
         Assert.assertEquals(retArray1[cnt], ans);
         cnt++;
       }
@@ -470,7 +464,6 @@ public class IOTDBGroupByIT {
             .getString(last("root.ln.wf01.wt01.temperature"))
             + "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")) + ","
             + resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
-        System.out.println(ans);
         Assert.assertEquals(retArray2[cnt], ans);
         cnt++;
       }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
index 77afbea..056a70f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
@@ -294,7 +294,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans =
             resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0)) + ","
                 + resultSet.getString(last(d0s1));
-        //System.out.println("!!!!!============ " + ans);
         Assert.assertEquals(retArray[cnt], ans);
         cnt++;
       }
@@ -323,7 +322,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans =
             resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(first(d0s0)) + ","
                 + resultSet.getString(first(d0s1));
-        //System.out.println("!!!!!============ " + ans);
         Assert.assertEquals(retArray[cnt], ans);
         cnt++;
       }
@@ -352,7 +350,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(sum(d0s0))
             + "," + resultSet.getString(sum(d0s1)) + "," + Math
             .round(resultSet.getDouble(sum(d0s2)));
-        //System.out.println("!!!!!============ " + ans);
         Assert.assertEquals(retArray[cnt], ans);
         cnt++;
       }
@@ -381,7 +378,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(mean(d0s0))
             + "," + Math.round(resultSet.getDouble(mean(d0s1))) + ","
             + Math.round(resultSet.getDouble(mean(d0s2)));
-        //System.out.println("!!!!!============ " + ans);
         Assert.assertEquals(retArray[cnt], ans);
         cnt++;
       }
@@ -404,7 +400,6 @@ public class IoTDBAggregationSmallDataIT {
       Statement statement = connection.createStatement();
       boolean hasResultSet = statement.execute("select count(s0),count(s1),count(s2),count(s3),"
           + "count(s4) from root.vehicle.d0 where s2 >= 3.33");
-      // System.out.println(hasResultSet + "...");
       Assert.assertTrue(hasResultSet);
       ResultSet resultSet = statement.getResultSet();
       int cnt = 0;
@@ -412,7 +407,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0))
             + "," + resultSet.getString(count(d0s1)) + "," + resultSet.getString(count(d0s2))
             + "," + resultSet.getString(count(d0s3)) + "," + resultSet.getString(count(d0s4));
-        // System.out.println("============ " + ans);
         Assert.assertEquals(retArray[cnt], ans);
         cnt++;
       }
@@ -444,7 +438,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(min_time(d0s0))
             + "," + resultSet.getString(min_time(d0s1)) + "," + resultSet.getString(min_time(d0s2))
             + "," + resultSet.getString(min_time(d0s3)) + "," + resultSet.getString(min_time(d0s4));
-        // System.out.println("============ " + ans);
         Assert.assertEquals(ans, retArray[cnt]);
         cnt++;
         Assert.assertEquals(1, cnt);
@@ -476,7 +469,6 @@ public class IoTDBAggregationSmallDataIT {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_time(d0s0))
             + "," + resultSet.getString(max_time(d0s1)) + "," + resultSet.getString(max_time(d0s2))
             + "," + resultSet.getString(max_time(d0s3)) + "," + resultSet.getString(max_time(d0s4));
-        // System.out.println("============ " + ans);
         Assert.assertEquals(ans, retArray[cnt]);
         cnt++;
       }
@@ -510,7 +502,6 @@ public class IoTDBAggregationSmallDataIT {
             "," + resultSet.getString(min_value(d0s2))
             + "," + resultSet.getString(min_value(d0s3)) + ","
             + resultSet.getString(min_value(d0s4));
-        // System.out.println("============ " + ans);
         Assert.assertEquals(ans, retArray[cnt]);
         cnt++;
       }
@@ -545,8 +536,6 @@ public class IoTDBAggregationSmallDataIT {
             .getString(max_value(d0s2))
             + "," + resultSet.getString(max_value(d0s3)) + "," + resultSet
             .getString(max_value(d0s4));
-        //System.out.println("============ " + ans);
-        //Assert.assertEquals(ans, retArray[cnt]);
         cnt++;
       }
       Assert.assertEquals(1, cnt);
@@ -568,13 +557,11 @@ public class IoTDBAggregationSmallDataIT {
       Statement statement = connection.createStatement();
       boolean hasResultSet = statement.execute(
           "select count(s0) from root.vehicle.d0 where s2 >= 3.33");
-      // System.out.println(hasResultSet + "...");
       Assert.assertTrue(hasResultSet);
       ResultSet resultSet = statement.getResultSet();
       int cnt = 0;
       while (resultSet.next()) {
         String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0));
-        //System.out.println("============ " + ans);
         Assert.assertEquals(ans, retArray[cnt]);
         cnt++;
       }
@@ -613,7 +600,6 @@ public class IoTDBAggregationSmallDataIT {
         (Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")) {
       Statement statement = connection.createStatement();
       boolean hasResultSet = statement.execute("select * from root");
-      // System.out.println(hasResultSet + "...");
       if (hasResultSet) {
         ResultSet resultSet = statement.getResultSet();
         int cnt = 0;
@@ -621,7 +607,6 @@ public class IoTDBAggregationSmallDataIT {
           String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(d0s0) + ","
               + resultSet.getString(d0s1) + "," + resultSet.getString(d0s2) + "," +
               resultSet.getString(d0s3) + "," + resultSet.getString(d1s0);
-          // System.out.println(ans);
           Assert.assertEquals(ans, retArray[cnt]);
           cnt++;
         }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 528b378..002c28f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -42,19 +42,21 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
 
   protected List<EngineReaderByTimeStamp> allDataReaderList;
   protected TimeGenerator timestampGenerator;
+
   /**
    * cached timestamp for next group by partition.
    */
-  private long timestamp;
+  protected long timestamp;
+
   /**
    * if this object has cached timestamp for next group by partition.
    */
-  private boolean hasCachedTimestamp;
+  protected boolean hasCachedTimestamp;
 
   /**
    * group by batch calculation size.
    */
-  private int timestampFetchSize;
+  protected int timestampFetchSize;
 
   /**
    * constructor.
@@ -152,7 +154,7 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
     return timeArrayLength;
   }
 
-  private RowRecord constructRowRecord() {
+  protected RowRecord constructRowRecord() {
     RowRecord record = new RowRecord(startTime);
     functions.forEach(function -> record.addField(getField(function.getResult())));
     return record;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index aeb789e..de52849 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -378,11 +378,11 @@ public class BatchData implements Serializable {
   /**
    * Checks if the given index is in range. If not, throws an appropriate runtime exception.
    */
-  private void rangeCheckForTime(int idx) {
+  protected void rangeCheckForTime(int idx) {
     if (idx < 0) {
       throw new IndexOutOfBoundsException("BatchData time range check, Index is negative: " + idx);
     }
-    if (idx >= timeLength) {
+    if (idx >= length()) {
       throw new IndexOutOfBoundsException(
           "BatchData time range check, Index : " + idx + ". Length : " + timeLength);
     }


[incubator-iotdb] 03/03: remove useless code

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 61e8452984f509f17a64001f3b5c1000cc447b03
Author: lta <li...@163.com>
AuthorDate: Tue May 21 21:01:54 2019 +0800

    remove useless code
---
 .../main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java | 1 -
 1 file changed, 1 deletion(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 17d5604..515fb6a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -355,7 +355,6 @@ public class NonQueryExecutor extends AbstractQPExecutor {
             RaftUtils.updateRaftGroupLeader(groupId, nextNode);
           } catch (RaftConnectionException e1) {
             LOGGER.debug("Non-query task for group {} to node {} fail.", groupId, nextNode);
-            continue;
           }
         }
         LOGGER.debug("The final result for non-query task is {}", success);


[incubator-iotdb] 02/03: Merge branch 'cluster' of github.com:apache/incubator-iotdb into cluster

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2a3028977f03158dc7e8244647b0e56f995f8339
Merge: 626f2a3 39c8865
Author: lta <li...@163.com>
AuthorDate: Tue May 21 21:00:11 2019 +0800

    Merge branch 'cluster' of github.com:apache/incubator-iotdb into cluster

 .../cluster/qp/executor/NonQueryExecutor.java      | 38 +++++++++++++++++++++-
 1 file changed, 37 insertions(+), 1 deletion(-)