You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 13:02:10 UTC
[incubator-iotdb] 01/03: add group by it and add
ClusterNullableBatach data to handle null timevalue pair
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 626f2a39c9cb2e71c14d0a0fba9ad8c90d0b8d47
Author: lta <li...@163.com>
AuthorDate: Tue May 21 20:58:57 2019 +0800
add group by it and add ClusterNullableBatach data to handle null timevalue pair
---
.../apache/iotdb/cluster/config/ClusterConfig.java | 2 +-
.../cluster/qp/executor/NonQueryExecutor.java | 2 -
...atchData.java => ClusterNullableBatchData.java} | 42 ++++++---
.../ClusterGroupByDataSetWithOnlyTimeFilter.java | 12 ++-
.../ClusterGroupByDataSetWithTimeGenerator.java | 101 +++++++++++++++++++++
.../cluster/query/executor/ClusterQueryRouter.java | 2 +-
.../querynode/ClusterLocalQueryManager.java | 7 +-
.../querynode/ClusterLocalSingleQueryManager.java | 10 +-
.../ClusterFillSelectSeriesBatchReader.java | 11 ++-
...lusterGroupBySelectSeriesBatchReaderEntity.java | 10 +-
.../query/utils/ClusterTimeValuePairUtils.java | 34 ++++++-
.../iotdb/cluster/rpc/raft/NodeAsClient.java | 7 +-
.../iotdb/cluster/integration/IOTDBGroupByIT.java | 59 ++++++------
.../integration/IoTDBAggregationSmallDataIT.java | 15 ---
.../groupby/GroupByWithValueFilterDataSet.java | 10 +-
.../apache/iotdb/tsfile/read/common/BatchData.java | 4 +-
16 files changed, 228 insertions(+), 100 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 45df39f..627f561 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -102,7 +102,7 @@ public class ClusterConfig {
* then it sends requests to other nodes in the cluster. This parameter represents the maximum
* timeout for these requests. The unit is milliseconds.
**/
- private int qpTaskTimeout = 500000;
+ private int qpTaskTimeout = 1000;
/**
* Number of virtual nodes
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 75fc3a8..ac9bddf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -352,8 +352,6 @@ public class NonQueryExecutor extends AbstractQPExecutor {
return RaftUtils.executeRaftTaskForLocalProcessor(service, qpTask, response);
}
-
-
/**
* Async handle task by QPTask and leader id.
*
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java
similarity index 54%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java
index 2d17d0c..8315a04 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/common/FillBatchData.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/common/ClusterNullableBatchData.java
@@ -18,48 +18,62 @@
*/
package org.apache.iotdb.cluster.query.common;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
/**
- * <code>FillBatchData</code> is a self-defined data structure which is used in cluster query
- * process of fill type, which only contains one TimeValuePair and value can be null.
+ * <code>ClusterNullableBatchData</code> is a self-defined data structure which is used in cluster
+ * query process of fill type and group by type, which may contain <code>null</code> in list of
+ * TimeValuePair.
*/
-public class FillBatchData extends BatchData {
+public class ClusterNullableBatchData extends BatchData {
- private TimeValuePair timeValuePair;
- private boolean isUsed;
+ private List<TimeValuePair> timeValuePairList;
+ private int index;
- public FillBatchData(TimeValuePair timeValuePair, boolean isUsed) {
- this.timeValuePair = timeValuePair;
- this.isUsed = isUsed;
+ public ClusterNullableBatchData() {
+ this.timeValuePairList = new ArrayList<>();
+ this.index = 0;
}
@Override
public boolean hasNext() {
- return !isUsed;
+ return index < timeValuePairList.size();
}
@Override
public void next() {
- isUsed = true;
+ index++;
}
@Override
public long currentTime() {
- return timeValuePair.getTimestamp();
+ rangeCheckForTime(index);
+ return timeValuePairList.get(index).getTimestamp();
}
@Override
public Object currentValue() {
- if (!isUsed) {
- return timeValuePair.getValue() == null ? null : timeValuePair.getValue().getValue();
+ if (index < length()) {
+ return timeValuePairList.get(index).getValue() == null ? null
+ : timeValuePairList.get(index).getValue().getValue();
} else {
return null;
}
}
+ @Override
+ public int length() {
+ return timeValuePairList.size();
+ }
+
public TimeValuePair getTimeValuePair() {
- return isUsed ? null : timeValuePair;
+ return index < length() ? timeValuePairList.get(index) : null;
+ }
+
+ public void addTimeValuePair(TimeValuePair timeValuePair){
+ timeValuePairList.add(timeValuePair);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
index 98460e4..ef5386d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithOnlyTimeFilter.java
@@ -64,7 +64,7 @@ public class ClusterGroupByDataSetWithOnlyTimeFilter extends GroupByWithOnlyTime
List<Path> paths, long unit, long origin,
List<Pair<Long, Long>> mergedIntervals, ClusterRpcSingleQueryManager queryManager) {
super(jobId, paths, unit, origin, mergedIntervals);
- this.queryManager =queryManager;
+ this.queryManager = queryManager;
this.readersOfSelectedSeries = new ArrayList<>();
}
@@ -132,10 +132,14 @@ public class ClusterGroupByDataSetWithOnlyTimeFilter extends GroupByWithOnlyTime
RowRecord record = new RowRecord(startTime);
for (int i = 0; i < functions.size(); i++) {
IPointReader reader = readersOfSelectedSeries.get(i);
- if(reader != null){
+ if (reader != null) {
TimeValuePair timeValuePair = reader.next();
- record.addField(getField(timeValuePair.getValue().getValue(), dataTypes.get(i)));
- }else {
+ if (timeValuePair == null) {
+ record.addField(new Field(null));
+ } else {
+ record.addField(getField(timeValuePair.getValue().getValue(), dataTypes.get(i)));
+ }
+ } else {
AggreResultData res;
try {
res = nextSeries(i);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
index 00f2d88..89ed1b9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/dataset/ClusterGroupByDataSetWithTimeGenerator.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterSeriesGroupEntity;
@@ -30,11 +31,13 @@ import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -88,4 +91,102 @@ public class ClusterGroupByDataSetWithTimeGenerator extends GroupByWithValueFilt
.createReadersByTimestampOfSelectedPaths(selectedSeries, context, queryManager,
selectSeriesDataTypes);
}
+
+ @Override
+ public RowRecord next() throws IOException {
+ if (!hasCachedTimeInterval) {
+ throw new IOException("need to call hasNext() before calling next()"
+ + " in GroupByWithOnlyTimeFilterDataSet.");
+ }
+ hasCachedTimeInterval = false;
+ for (AggregateFunction function : functions) {
+ function.init();
+ }
+
+ long[] timestampArray = new long[timestampFetchSize];
+ int timeArrayLength = 0;
+ if (hasCachedTimestamp) {
+ if (timestamp < endTime) {
+ hasCachedTimestamp = false;
+ timestampArray[timeArrayLength++] = timestamp;
+ } else {
+ return constructRowRecord();
+ }
+ }
+
+ while (timestampGenerator.hasNext()) {
+ // construct timestamp array
+ timeArrayLength = constructTimeArrayForOneCal(timestampArray, timeArrayLength);
+
+ fetchSelectDataFromRemoteNode(timeArrayLength, timestampArray);
+
+ // cal result using timestamp array
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ functions.get(i).calcAggregationUsingTimestamps(
+ timestampArray, timeArrayLength, allDataReaderList.get(i));
+ }
+
+ timeArrayLength = 0;
+ // judge if it's end
+ if (timestamp >= endTime) {
+ hasCachedTimestamp = true;
+ break;
+ }
+ }
+
+ // fetch select series data from remote node
+ fetchSelectDataFromRemoteNode(timeArrayLength, timestampArray);
+
+ if (timeArrayLength > 0) {
+ // cal result using timestamp array
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ functions.get(i).calcAggregationUsingTimestamps(
+ timestampArray, timeArrayLength, allDataReaderList.get(i));
+ }
+ }
+ return constructRowRecord();
+ }
+
+ /**
+ * Get select series batch data by batch timestamp
+ * @param timeArrayLength length of batch timestamp
+ * @param timestampArray timestamp array
+ */
+ private void fetchSelectDataFromRemoteNode(int timeArrayLength, long[] timestampArray)
+ throws IOException {
+ if(timeArrayLength != 0){
+ List<Long> batchTimestamp = new ArrayList<>();
+ for(int i = 0 ; i < timeArrayLength; i++){
+ batchTimestamp.add(timestampArray[i]);
+ }
+
+ try {
+ queryManager.fetchBatchDataByTimestampForAllSelectPaths(batchTimestamp);
+ } catch (
+ RaftConnectionException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /**
+ * construct an array of timestamps for one batch of a group by partition calculating.
+ *
+ * @param timestampArray timestamp array
+ * @param timeArrayLength the current length of timestamp array
+ * @return time array length
+ */
+ private int constructTimeArrayForOneCal(long[] timestampArray, int timeArrayLength)
+ throws IOException {
+ for (int cnt = 1; cnt < timestampFetchSize && timestampGenerator.hasNext(); cnt++) {
+ timestamp = timestampGenerator.next();
+ if (timestamp < endTime) {
+ timestampArray[timeArrayLength++] = timestamp;
+ } else {
+ hasCachedTimestamp = true;
+ break;
+ }
+ }
+ return timeArrayLength;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index 54e0df5..9f9dc41 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -168,7 +168,7 @@ public class ClusterQueryRouter extends AbstractQueryRouter {
.optimize(expression, selectedSeries);
try {
if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
-// queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel());
+ queryManager.initQueryResource(QueryType.GLOBAL_TIME, getReadDataConsistencyLevel());
ClusterGroupByDataSetWithOnlyTimeFilter groupByEngine = new ClusterGroupByDataSetWithOnlyTimeFilter(
jobId, selectedSeries, unit, origin, mergedIntervalList, queryManager);
groupByEngine.initGroupBy(context, aggres, optimizedExpression);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index c83e2a2..4e09af8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -59,12 +59,7 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
TASK_ID_MAP_JOB_ID.put(taskId, jobId);
ClusterLocalSingleQueryManager localQueryManager = new ClusterLocalSingleQueryManager(jobId);
SINGLE_QUERY_MANAGER_MAP.put(jobId, localQueryManager);
- try {
- return localQueryManager.createSeriesReader(request);
- }catch (Exception e){
- e.printStackTrace();
- return null;
- }
+ return localQueryManager.createSeriesReader(request);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index e9c0dcb..25adbf5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -492,11 +492,11 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
@Override
public void run() {
-// try {
-//// close();
-// } catch (FileNodeManagerException e) {
-// LOGGER.error(e.getMessage());
-// }
+ try {
+ close();
+ } catch (FileNodeManagerException e) {
+ LOGGER.error(e.getMessage());
+ }
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
index fadd92f..a16a220 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
@@ -19,8 +19,9 @@
package org.apache.iotdb.cluster.query.reader.querynode;
import java.io.IOException;
-import org.apache.iotdb.cluster.query.common.FillBatchData;
+import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData;
import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -34,6 +35,12 @@ public class ClusterFillSelectSeriesBatchReader extends ClusterSelectSeriesBatch
@Override
public BatchData nextBatch() throws IOException {
- return hasNext() ? new FillBatchData(reader.next(), false) : new FillBatchData(null, true);
+ if(hasNext()){
+ ClusterNullableBatchData batchData = new ClusterNullableBatchData();
+ batchData.addTimeValuePair(reader.next());
+ return batchData;
+ }else{
+ return new ClusterNullableBatchData();
+ }
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
index 3b7fabe..5c1d9d1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterGroupBySelectSeriesBatchReaderEntity.java
@@ -23,7 +23,10 @@ import static org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSerie
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData;
+import org.apache.iotdb.cluster.query.utils.ClusterTimeValuePairUtils;
import org.apache.iotdb.db.query.dataset.groupby.GroupByWithOnlyTimeFilterDataSet;
+import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Field;
@@ -56,7 +59,7 @@ public class ClusterGroupBySelectSeriesBatchReaderEntity implements
public List<BatchData> nextBatchList() throws IOException {
List<BatchData> batchDataList = new ArrayList<>(paths.size());
for (int i = 0; i < paths.size(); i++) {
- batchDataList.add(new BatchData(dataTypes.get(i), true));
+ batchDataList.add(new ClusterNullableBatchData());
}
int dataPointCount = 0;
while (true) {
@@ -68,10 +71,9 @@ public class ClusterGroupBySelectSeriesBatchReaderEntity implements
long time = rowRecord.getTimestamp();
List<Field> fieldList = rowRecord.getFields();
for (int j = 0; j < paths.size(); j++) {
- BatchData batchData = batchDataList.get(j);
+ ClusterNullableBatchData batchData = (ClusterNullableBatchData) batchDataList.get(j);
Object value = fieldList.get(j).getObjectValue(dataTypes.get(j));
- batchData.putTime(time);
- batchData.putAnObject(value);
+ batchData.addTimeValuePair(fieldList.get(j).toString().equals("null") ? null : ClusterTimeValuePairUtils.getTimeValuePair(time, value,dataTypes.get(j)));
}
}
return batchDataList;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
index 7525368..d9c8d75 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
@@ -18,10 +18,14 @@
*/
package org.apache.iotdb.cluster.query.utils;
-import org.apache.iotdb.cluster.query.common.FillBatchData;
+import org.apache.iotdb.cluster.query.common.ClusterNullableBatchData;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.Binary;
public class ClusterTimeValuePairUtils {
@@ -35,10 +39,32 @@ public class ClusterTimeValuePairUtils {
* @return -given data's (time,value) pair
*/
public static TimeValuePair getCurrentTimeValuePair(BatchData data) {
- if (data instanceof FillBatchData){
- return ((FillBatchData)data).getTimeValuePair();
- }else{
+ if (data instanceof ClusterNullableBatchData) {
+ return ((ClusterNullableBatchData) data).getTimeValuePair();
+ } else {
return TimeValuePairUtils.getCurrentTimeValuePair(data);
}
}
+
+ /**
+ * Get (time,value) pair according to data type
+ */
+ public static TimeValuePair getTimeValuePair(long time, Object v, TSDataType dataType) {
+ switch (dataType) {
+ case INT32:
+ return new TimeValuePair(time, new TsPrimitiveType.TsInt((int) v));
+ case INT64:
+ return new TimeValuePair(time, new TsPrimitiveType.TsLong((long) v));
+ case FLOAT:
+ return new TimeValuePair(time, new TsPrimitiveType.TsFloat((float) v));
+ case DOUBLE:
+ return new TimeValuePair(time, new TsPrimitiveType.TsDouble((double) v));
+ case TEXT:
+ return new TimeValuePair(time, new TsPrimitiveType.TsBinary((Binary) v));
+ case BOOLEAN:
+ return new TimeValuePair(time, new TsPrimitiveType.TsBoolean((boolean) v));
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(v));
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
index 197c7eb..b4a2f25 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.cluster.rpc.raft;
import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.task.QueryTask;
import org.apache.iotdb.cluster.qp.task.SingleQPTask;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
-import org.apache.iotdb.cluster.qp.task.QueryTask;
/**
* Handle the request and process the result as a client with the current node
@@ -31,7 +31,8 @@ public interface NodeAsClient {
/**
* Asynchronous processing requests
- * @param leader leader node of the target group
+ *
+ * @param leader leader node of the target group
* @param qpTask single QPTask to be executed
*/
void asyncHandleRequest(BasicRequest request, PeerId leader,
@@ -39,8 +40,8 @@ public interface NodeAsClient {
/**
* Synchronous processing requests
- * @param peerId leader node of the target group
*
+ * @param peerId leader node of the target group
*/
QueryTask syncHandleRequest(BasicRequest request, PeerId peerId);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
index 265d509..0165bba 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IOTDBGroupByIT.java
@@ -120,7 +120,6 @@ public class IOTDBGroupByIT {
EnvironmentUtils.closeMemControl();
CLUSTER_CONFIG.createAllPath();
server = Server.getInstance();
- QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
server.start();
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -135,7 +134,7 @@ public class IOTDBGroupByIT {
}
@Test
- public void countSumMeanTest() throws SQLException {
+ public void countSumMeanTest() {
String[] retArray1 = new String[]{
"2,1,4.4,4.4",
"5,3,35.8,11.933333333333332",
@@ -162,30 +161,10 @@ public class IOTDBGroupByIT {
};
try (Connection connection = DriverManager.
getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root")) {
-// Statement statement = connection.createStatement();
-// boolean hasResultSet = statement.execute(
-// "select count(temperature), sum(temperature), mean(temperature) from "
-// + "root.ln.wf01.wt01 where time > 3 "
-// + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
-//
-// Assert.assertTrue(hasResultSet);
-// ResultSet resultSet = statement.getResultSet();
-// int cnt = 0;
-// while (resultSet.next()) {
-// String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
-// .getString(count("root.ln.wf01.wt01.temperature")) + "," +
-// resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
-// .getString(mean("root.ln.wf01.wt01.temperature"));
-// Assert.assertEquals(retArray1[cnt], ans);
-// cnt++;
-// }
-// Assert.assertEquals(retArray1.length, cnt);
-// statement.close();
-
Statement statement = connection.createStatement();
boolean hasResultSet = statement.execute(
"select count(temperature), sum(temperature), mean(temperature) from "
- + "root.ln.wf01.wt01 where temperature > 3 "
+ + "root.ln.wf01.wt01 where time > 3 "
+ "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
Assert.assertTrue(hasResultSet);
@@ -196,6 +175,26 @@ public class IOTDBGroupByIT {
.getString(count("root.ln.wf01.wt01.temperature")) + "," +
resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
.getString(mean("root.ln.wf01.wt01.temperature"));
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray1.length, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute(
+ "select count(temperature), sum(temperature), mean(temperature) from "
+ + "root.ln.wf01.wt01 where temperature > 3 "
+ + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+ .getString(count("root.ln.wf01.wt01.temperature")) + "," +
+ resultSet.getString(sum("root.ln.wf01.wt01.temperature")) + "," + resultSet
+ .getString(mean("root.ln.wf01.wt01.temperature"));
Assert.assertEquals(retArray2[cnt], ans);
cnt++;
}
@@ -209,7 +208,7 @@ public class IOTDBGroupByIT {
}
@Test
- public void maxMinValeTimeTest() throws SQLException {
+ public void maxMinValeTimeTest() {
String[] retArray1 = new String[]{
"2,4.4,4.4,4,4",
"5,20.2,5.5,20,5",
@@ -274,7 +273,6 @@ public class IOTDBGroupByIT {
+ "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
Assert.assertEquals(retArray2[cnt], ans);
cnt++;
- //System.out.println(ans);
}
Assert.assertEquals(retArray2.length, cnt);
statement.close();
@@ -286,7 +284,7 @@ public class IOTDBGroupByIT {
}
@Test
- public void firstLastTest() throws SQLException {
+ public void firstLastTest() {
String[] retArray1 = new String[]{
"2,4.4,4.4",
"5,20.2,5.5",
@@ -325,7 +323,6 @@ public class IOTDBGroupByIT {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
.getString(last("root.ln.wf01.wt01.temperature"))
+ "," + resultSet.getString(first("root.ln.wf01.wt01.temperature"));
- System.out.println(ans);
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
@@ -345,7 +342,6 @@ public class IOTDBGroupByIT {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
.getString(last("root.ln.wf01.wt01.temperature"))
+ "," + resultSet.getString(first("root.ln.wf01.wt01.temperature"));
- System.out.println(ans);
Assert.assertEquals(retArray2[cnt], ans);
cnt++;
}
@@ -359,7 +355,7 @@ public class IOTDBGroupByIT {
}
@Test
- public void largeIntervalTest() throws SQLException {
+ public void largeIntervalTest() {
String[] retArray1 = new String[]{
"2,4.4,4,20,4",
"30,30.3,16,610,30",
@@ -410,7 +406,6 @@ public class IOTDBGroupByIT {
+ "," + resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
Assert.assertEquals(retArray2[cnt], ans);
cnt++;
- //System.out.println(ans);
}
Assert.assertEquals(retArray2.length, cnt);
statement.close();
@@ -422,7 +417,7 @@ public class IOTDBGroupByIT {
}
@Test
- public void smallPartitionTest() throws SQLException {
+ public void smallPartitionTest() {
String[] retArray1 = new String[]{
"50,100.1,50.5,150.6",
"615,500.5,500.5,500.5"
@@ -449,7 +444,6 @@ public class IOTDBGroupByIT {
.getString(last("root.ln.wf01.wt01.temperature"))
+ "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")) + ","
+ resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
- System.out.println(ans);
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
@@ -470,7 +464,6 @@ public class IOTDBGroupByIT {
.getString(last("root.ln.wf01.wt01.temperature"))
+ "," + resultSet.getString(first("root.ln.wf01.wt01.temperature")) + ","
+ resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
- System.out.println(ans);
Assert.assertEquals(retArray2[cnt], ans);
cnt++;
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
index 77afbea..056a70f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationSmallDataIT.java
@@ -294,7 +294,6 @@ public class IoTDBAggregationSmallDataIT {
String ans =
resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0)) + ","
+ resultSet.getString(last(d0s1));
- //System.out.println("!!!!!============ " + ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
@@ -323,7 +322,6 @@ public class IoTDBAggregationSmallDataIT {
String ans =
resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(first(d0s0)) + ","
+ resultSet.getString(first(d0s1));
- //System.out.println("!!!!!============ " + ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
@@ -352,7 +350,6 @@ public class IoTDBAggregationSmallDataIT {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(sum(d0s0))
+ "," + resultSet.getString(sum(d0s1)) + "," + Math
.round(resultSet.getDouble(sum(d0s2)));
- //System.out.println("!!!!!============ " + ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
@@ -381,7 +378,6 @@ public class IoTDBAggregationSmallDataIT {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(mean(d0s0))
+ "," + Math.round(resultSet.getDouble(mean(d0s1))) + ","
+ Math.round(resultSet.getDouble(mean(d0s2)));
- //System.out.println("!!!!!============ " + ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
@@ -404,7 +400,6 @@ public class IoTDBAggregationSmallDataIT {
Statement statement = connection.createStatement();
boolean hasResultSet = statement.execute("select count(s0),count(s1),count(s2),count(s3),"
+ "count(s4) from root.vehicle.d0 where s2 >= 3.33");
- // System.out.println(hasResultSet + "...");
Assert.assertTrue(hasResultSet);
ResultSet resultSet = statement.getResultSet();
int cnt = 0;
@@ -412,7 +407,6 @@ public class IoTDBAggregationSmallDataIT {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0))
+ "," + resultSet.getString(count(d0s1)) + "," + resultSet.getString(count(d0s2))
+ "," + resultSet.getString(count(d0s3)) + "," + resultSet.getString(count(d0s4));
- // System.out.println("============ " + ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
@@ -444,7 +438,6 @@ public class IoTDBAggregationSmallDataIT {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(min_time(d0s0))
+ "," + resultSet.getString(min_time(d0s1)) + "," + resultSet.getString(min_time(d0s2))
+ "," + resultSet.getString(min_time(d0s3)) + "," + resultSet.getString(min_time(d0s4));
- // System.out.println("============ " + ans);
Assert.assertEquals(ans, retArray[cnt]);
cnt++;
Assert.assertEquals(1, cnt);
@@ -476,7 +469,6 @@ public class IoTDBAggregationSmallDataIT {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_time(d0s0))
+ "," + resultSet.getString(max_time(d0s1)) + "," + resultSet.getString(max_time(d0s2))
+ "," + resultSet.getString(max_time(d0s3)) + "," + resultSet.getString(max_time(d0s4));
- // System.out.println("============ " + ans);
Assert.assertEquals(ans, retArray[cnt]);
cnt++;
}
@@ -510,7 +502,6 @@ public class IoTDBAggregationSmallDataIT {
"," + resultSet.getString(min_value(d0s2))
+ "," + resultSet.getString(min_value(d0s3)) + ","
+ resultSet.getString(min_value(d0s4));
- // System.out.println("============ " + ans);
Assert.assertEquals(ans, retArray[cnt]);
cnt++;
}
@@ -545,8 +536,6 @@ public class IoTDBAggregationSmallDataIT {
.getString(max_value(d0s2))
+ "," + resultSet.getString(max_value(d0s3)) + "," + resultSet
.getString(max_value(d0s4));
- //System.out.println("============ " + ans);
- //Assert.assertEquals(ans, retArray[cnt]);
cnt++;
}
Assert.assertEquals(1, cnt);
@@ -568,13 +557,11 @@ public class IoTDBAggregationSmallDataIT {
Statement statement = connection.createStatement();
boolean hasResultSet = statement.execute(
"select count(s0) from root.vehicle.d0 where s2 >= 3.33");
- // System.out.println(hasResultSet + "...");
Assert.assertTrue(hasResultSet);
ResultSet resultSet = statement.getResultSet();
int cnt = 0;
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0));
- //System.out.println("============ " + ans);
Assert.assertEquals(ans, retArray[cnt]);
cnt++;
}
@@ -613,7 +600,6 @@ public class IoTDBAggregationSmallDataIT {
(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root")) {
Statement statement = connection.createStatement();
boolean hasResultSet = statement.execute("select * from root");
- // System.out.println(hasResultSet + "...");
if (hasResultSet) {
ResultSet resultSet = statement.getResultSet();
int cnt = 0;
@@ -621,7 +607,6 @@ public class IoTDBAggregationSmallDataIT {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(d0s0) + ","
+ resultSet.getString(d0s1) + "," + resultSet.getString(d0s2) + "," +
resultSet.getString(d0s3) + "," + resultSet.getString(d1s0);
- // System.out.println(ans);
Assert.assertEquals(ans, retArray[cnt]);
cnt++;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 528b378..002c28f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -42,19 +42,21 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
protected List<EngineReaderByTimeStamp> allDataReaderList;
protected TimeGenerator timestampGenerator;
+
/**
* cached timestamp for next group by partition.
*/
- private long timestamp;
+ protected long timestamp;
+
/**
* if this object has cached timestamp for next group by partition.
*/
- private boolean hasCachedTimestamp;
+ protected boolean hasCachedTimestamp;
/**
* group by batch calculation size.
*/
- private int timestampFetchSize;
+ protected int timestampFetchSize;
/**
* constructor.
@@ -152,7 +154,7 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
return timeArrayLength;
}
- private RowRecord constructRowRecord() {
+ protected RowRecord constructRowRecord() {
RowRecord record = new RowRecord(startTime);
functions.forEach(function -> record.addField(getField(function.getResult())));
return record;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index aeb789e..de52849 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -378,11 +378,11 @@ public class BatchData implements Serializable {
/**
* Checks if the given index is in range. If not, throws an appropriate runtime exception.
*/
- private void rangeCheckForTime(int idx) {
+ protected void rangeCheckForTime(int idx) {
if (idx < 0) {
throw new IndexOutOfBoundsException("BatchData time range check, Index is negative: " + idx);
}
- if (idx >= timeLength) {
+ if (idx >= length()) {
throw new IndexOutOfBoundsException(
"BatchData time range check, Index : " + idx + ". Length : " + timeLength);
}