You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/05/21 01:51:30 UTC
[incubator-iotdb] 03/11: fix a serve bug of filter serializable
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit aa3aa0d943626e3864753dd93ff4585e6f60e3eb
Author: lta <li...@163.com>
AuthorDate: Fri May 17 15:08:50 2019 +0800
fix a serve bug of filter serializable
---
.../executor/ClusterAggregateEngineExecutor.java | 140 ++++-
.../ClusterRpcSingleQueryManager.java | 4 +-
.../querynode/ClusterLocalQueryManager.java | 2 +-
.../querynode/ClusterLocalSingleQueryManager.java | 25 +-
.../querynode/IClusterLocalQueryManager.java | 4 +-
.../querynode/IClusterLocalSingleQueryManager.java | 5 +-
.../querynode/ClusterFilterSeriesBatchReader.java | 8 +-
.../cluster/query/utils/ClusterRpcReaderUtils.java | 28 +-
.../query/utils/QueryPlanPartitionUtils.java | 2 +-
.../request/querydata/InitSeriesReaderRequest.java | 72 ++-
.../apache/iotdb/cluster/integration/Constant.java | 100 ++++
.../cluster/integration/IoTDBAggregationIT.java | 640 +++++++++++++++++++++
.../cluster/integration/IoTDBFillQueryIT.java | 1 -
.../db/query/executor/AggregateEngineExecutor.java | 14 +-
.../timegenerator/AbstractNodeConstructor.java | 3 -
15 files changed, 996 insertions(+), 52 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
index b63b311..51113c9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterAggregateEngineExecutor.java
@@ -20,10 +20,18 @@ package org.apache.iotdb.cluster.query.executor;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
import org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
+import org.apache.iotdb.cluster.query.timegenerator.ClusterTimeGenerator;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -41,6 +49,7 @@ import org.apache.iotdb.db.query.executor.AggregateEngineExecutor;
import org.apache.iotdb.db.query.factory.AggreFuncFactory;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -49,10 +58,16 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+/**
+ * Handle aggregation query and construct dataset in cluster
+ */
public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
private ClusterRpcSingleQueryManager queryManager;
+ private static final ClusterConfig CLUSTER_CONF = ClusterDescriptor.getInstance().getConfig();
+
public ClusterAggregateEngineExecutor(List<Path> selectedSeries, List<String> aggres,
IExpression expression, ClusterRpcSingleQueryManager queryManager) {
@@ -80,7 +95,7 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
paths.add(path);
// construct AggregateFunction
TSDataType tsDataType = MManager.getInstance()
- .getSeriesType(selectedSeries.get(i).getFullPath());
+ .getSeriesType(path.getFullPath());
AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType);
function.init();
@@ -103,6 +118,8 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
AggreResultData aggreResultData = aggregateWithoutTimeGenerator(function,
sequenceReader, unSeqMergeReader, timeFilter);
+
+ dataTypes.add(aggreResultData.getDataType());
readers.add(new AggreResultDataPointReader(aggreResultData));
}
}
@@ -111,4 +128,125 @@ public class ClusterAggregateEngineExecutor extends AggregateEngineExecutor {
return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, readers);
}
+
+ /**
+ * execute aggregate function with value filter.
+ *
+ * @param context query context.
+ */
+ @Override
+ public QueryDataSet executeWithTimeGenerator(QueryContext context)
+ throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
+
+ /** add query token for query series which can handle locally **/
+ List<Path> localQuerySeries = new ArrayList<>(selectedSeries);
+ Set<Path> remoteQuerySeries = queryManager.getSelectSeriesReaders().keySet();
+ localQuerySeries.removeAll(remoteQuerySeries);
+ QueryResourceManager.getInstance()
+ .beginQueryOfGivenQueryPaths(context.getJobId(), localQuerySeries);
+
+ /** add query token for filter series which can handle locally **/
+ Set<String> deviceIdSet = new HashSet<>();
+ for (FilterGroupEntity filterGroupEntity : queryManager.getFilterGroupEntityMap().values()) {
+ List<Path> remoteFilterSeries = filterGroupEntity.getFilterPaths();
+ remoteFilterSeries.forEach(seriesPath -> deviceIdSet.add(seriesPath.getDevice()));
+ }
+ QueryResourceManager.getInstance()
+ .beginQueryOfGivenExpression(context.getJobId(), expression, deviceIdSet);
+
+ ClusterTimeGenerator timestampGenerator;
+ List<EngineReaderByTimeStamp> readersOfSelectedSeries;
+ try {
+ timestampGenerator = new ClusterTimeGenerator(expression, context,
+ queryManager);
+ readersOfSelectedSeries = ClusterSeriesReaderFactory
+ .createReadersByTimestampOfSelectedPaths(selectedSeries, context,
+ queryManager);
+ } catch (IOException ex) {
+ throw new FileNodeManagerException(ex);
+ }
+
+ /** Get data type of select paths **/
+ List<TSDataType> originDataTypes = new ArrayList<>();
+ Map<Path, ClusterSelectSeriesReader> selectSeriesReaders = queryManager
+ .getSelectSeriesReaders();
+ for (Path path : selectedSeries) {
+ try {
+ if (selectSeriesReaders.containsKey(path)) {
+ originDataTypes.add(selectSeriesReaders.get(path).getDataType());
+ } else {
+ originDataTypes.add(MManager.getInstance().getSeriesType(path.getFullPath()));
+ }
+ } catch (PathErrorException e) {
+ throw new FileNodeManagerException(e);
+ }
+ }
+
+ List<AggregateFunction> aggregateFunctions = new ArrayList<>();
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ TSDataType type = originDataTypes.get(i);
+ AggregateFunction function = AggreFuncFactory.getAggrFuncByName(aggres.get(i), type);
+ function.init();
+ aggregateFunctions.add(function);
+ }
+ List<AggreResultData> aggreResultDataList = aggregateWithTimeGenerator(aggregateFunctions,
+ timestampGenerator,
+ readersOfSelectedSeries);
+
+ List<IPointReader> resultDataPointReaders = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (AggreResultData resultData : aggreResultDataList) {
+ dataTypes.add(resultData.getDataType());
+ resultDataPointReaders.add(new AggreResultDataPointReader(resultData));
+ }
+ return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, resultDataPointReaders);
+ }
+
+ /**
+ * calculation aggregate result with value filter.
+ */
+ @Override
+ protected List<AggreResultData> aggregateWithTimeGenerator(
+ List<AggregateFunction> aggregateFunctions,
+ TimeGenerator timestampGenerator,
+ List<EngineReaderByTimeStamp> readersOfSelectedSeries)
+ throws IOException {
+
+ while (timestampGenerator.hasNext()) {
+
+ // generate timestamps for aggregate
+ long[] timeArray = new long[aggregateFetchSize];
+ List<Long> batchTimestamp = new ArrayList<>();
+ int timeArrayLength = 0;
+ for (int cnt = 0; cnt < aggregateFetchSize; cnt++) {
+ if (!timestampGenerator.hasNext()) {
+ break;
+ }
+ long time = timestampGenerator.next();
+ timeArray[timeArrayLength++] = time;
+ batchTimestamp.add(time);
+ }
+
+ // fetch all remote select series data by timestamp list.
+ if (!batchTimestamp.isEmpty()) {
+ try {
+ queryManager.fetchBatchDataByTimestampForAllSelectPaths(batchTimestamp);
+ } catch (RaftConnectionException e) {
+ throw new IOException(e);
+ }
+ }
+
+ // cal part of aggregate result
+ for (int i = 0; i < readersOfSelectedSeries.size(); i++) {
+ aggregateFunctions.get(i).calcAggregationUsingTimestamps(timeArray, timeArrayLength,
+ readersOfSelectedSeries.get(i));
+ }
+ }
+
+ List<AggreResultData> aggreResultDataArrayList = new ArrayList<>();
+ for (AggregateFunction function : aggregateFunctions) {
+ aggreResultDataArrayList.add(function.getResult());
+ }
+ return aggreResultDataArrayList;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index faeda22..6c4f2ad 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -134,7 +134,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
* group
*/
private void initSeriesReader(int readDataConsistencyLevel)
- throws RaftConnectionException {
+ throws RaftConnectionException, IOException {
// Init all series with data group of select series,if filter series has the same data group, init them together.
for (Entry<String, QueryPlan> entry : selectPathPlans.entrySet()) {
String groupId = entry.getKey();
@@ -144,7 +144,7 @@ public class ClusterRpcSingleQueryManager implements IClusterRpcSingleQueryManag
queryNodes.put(groupId, randomPeer);
Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
allQueryPlan.put(PathType.SELECT_PATH, queryPlan);
- List<Filter> filterList = null;
+ List<Filter> filterList = new ArrayList<>();
if (filterGroupEntityMap.containsKey(groupId)) {
FilterGroupEntity filterGroupEntity = filterGroupEntityMap.get(groupId);
allQueryPlan.put(PathType.FILTER_PATH, filterGroupEntity.getQueryPlan());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index fe3ac52..4e09af8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@ -53,7 +53,7 @@ public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
@Override
public InitSeriesReaderResponse createQueryDataSet(InitSeriesReaderRequest request)
- throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException {
+ throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException {
long jobId = QueryResourceManager.getInstance().assignJobId();
String taskId = request.getTaskId();
TASK_ID_MAP_JOB_ID.put(taskId, jobId);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 76a141e..0f2cf62 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -64,9 +64,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
-import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.slf4j.Logger;
@@ -75,7 +73,8 @@ import org.slf4j.LoggerFactory;
public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(ClusterLocalSingleQueryManager.class);
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(ClusterLocalSingleQueryManager.class);
private String groupId;
@@ -127,7 +126,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
@Override
public InitSeriesReaderResponse createSeriesReader(InitSeriesReaderRequest request)
- throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException {
+ throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException {
this.groupId = request.getGroupID();
InitSeriesReaderResponse response = new InitSeriesReaderResponse(groupId);
QueryContext context = new QueryContext(jobId);
@@ -199,7 +198,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException {
if (queryPlan.getExpression() == null
|| queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
- handleAggreSeriesReaderWithoutTimeGenerator(queryPlan,context,response);
+ handleAggreSeriesReaderWithoutTimeGenerator(queryPlan, context, response);
} else {
handleSelectReaderWithTimeGenerator(queryPlan, context, response);
}
@@ -210,24 +209,23 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
*
* @param queryPlan fill query plan
*/
- private void handleAggreSeriesReaderWithoutTimeGenerator(QueryPlan queryPlan, QueryContext context,
+ private void handleAggreSeriesReaderWithoutTimeGenerator(QueryPlan queryPlan,
+ QueryContext context,
InitSeriesReaderResponse response)
- throws FileNodeManagerException, PathErrorException, IOException, QueryFilterOptimizationException, ProcessorException {
+ throws FileNodeManagerException, PathErrorException, IOException, ProcessorException {
AggregationPlan fillQueryPlan = (AggregationPlan) queryPlan;
List<Path> selectedPaths = fillQueryPlan.getPaths();
QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, selectedPaths);
- IExpression optimizedExpression = ExpressionOptimizer.getInstance()
- .optimize(fillQueryPlan.getExpression(), selectedPaths);
AggregateEngineExecutor engineExecutor = new AggregateEngineExecutor(
- selectedPaths, fillQueryPlan.getAggregations(), optimizedExpression);
+ selectedPaths, fillQueryPlan.getAggregations(), fillQueryPlan.getExpression());
List<IPointReader> readers = engineExecutor.constructAggreReadersWithoutTimeGenerator(context);
List<TSDataType> dataTypes = engineExecutor.getDataTypes();
- for (int i =0 ; i < selectedPaths.size(); i ++) {
+ for (int i = 0; i < selectedPaths.size(); i++) {
Path path = selectedPaths.get(i);
selectSeriesReaders.put(path.getFullPath(),
new ClusterSelectSeriesBatchReader(dataTypes.get(i), readers.get(i)));
@@ -291,7 +289,7 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
*/
private void handleFilterSeriesReader(QueryPlan plan, QueryContext context,
InitSeriesReaderRequest request, InitSeriesReaderResponse response, PathType pathType)
- throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException {
+ throws PathErrorException, QueryFilterOptimizationException, FileNodeManagerException, ProcessorException, IOException, ClassNotFoundException {
QueryDataSet queryDataSet = queryProcessExecutor
.processQuery(plan, context);
List<Path> paths = plan.getPaths();
@@ -321,7 +319,8 @@ public class ClusterLocalSingleQueryManager implements IClusterLocalSingleQueryM
.createReaderByTimeStamp(path, context);
TSDataType dataType = MManager.getInstance().getSeriesType(path.getFullPath());
selectSeriesReaders
- .put(path.getFullPath(), new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType));
+ .put(path.getFullPath(),
+ new ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType));
dataTypeMap.put(path.getFullPath(), dataType);
dataTypeList.add(dataType);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
index cc0f103..1105bb2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
@@ -42,7 +42,7 @@ public interface IClusterLocalQueryManager {
* @param request request for query data from coordinator node
*/
InitSeriesReaderResponse createQueryDataSet(InitSeriesReaderRequest request)
- throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException;
+ throws IOException, FileNodeManagerException, PathErrorException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException;
/**
* Read batch data of all querying series in request and set response.
@@ -54,8 +54,8 @@ public interface IClusterLocalQueryManager {
/**
* Read batch data of select series by batch timestamp which is used in query with value filter
- * @param request request of querying select paths
*
+ * @param request request of querying select paths
*/
QuerySeriesDataByTimestampResponse readBatchDataByTimestamp(
QuerySeriesDataByTimestampRequest request) throws IOException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java
index 318772f..1d89c5c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalSingleQueryManager.java
@@ -40,18 +40,19 @@ public interface IClusterLocalSingleQueryManager {
/**
* Initially create corresponding series readers.
+ *
* @param request request of querying series data
*/
InitSeriesReaderResponse createSeriesReader(InitSeriesReaderRequest request)
- throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException;
+ throws IOException, PathErrorException, FileNodeManagerException, ProcessorException, QueryFilterOptimizationException, ClassNotFoundException;
/**
* <p>
* Read batch data If query round in cache is equal to target query round, it means that batch
* data in query node transfer to coordinator fail and return cached batch data.
* </p>
- * @param request request of querying series data
*
+ * @param request request of querying series data
*/
QuerySeriesDataResponse readBatchData(QuerySeriesDataRequest request)
throws IOException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java
index 3f21835..1cd357e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFilterSeriesBatchReader.java
@@ -68,12 +68,12 @@ public class ClusterFilterSeriesBatchReader implements IClusterFilterSeriesBatch
batchDataList.add(new BatchData(dataTypeList.get(i), true));
}
int dataPointCount = 0;
- while(true){
- if(!hasNext() || dataPointCount == CLUSTER_CONF.getBatchReadSize()){
+ while (true) {
+ if (!hasNext() || dataPointCount == CLUSTER_CONF.getBatchReadSize()) {
break;
}
- if(hasNext() && addTimeValuePair(batchDataList, dataTypeList)){
- dataPointCount++;
+ if (hasNext() && addTimeValuePair(batchDataList, dataTypeList)) {
+ dataPointCount++;
}
}
return batchDataList;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
index 424ba95..dca2d30 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.query.utils;
import com.alipay.sofa.jraft.entity.PeerId;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -62,7 +63,7 @@ public class ClusterRpcReaderUtils {
*/
public static BasicResponse createClusterSeriesReader(String groupId, PeerId peerId,
int readDataConsistencyLevel, Map<PathType, QueryPlan> allQueryPlan, String taskId,
- List<Filter> filterList) throws RaftConnectionException {
+ List<Filter> filterList) throws RaftConnectionException, IOException {
/** handle request **/
BasicRequest request = InitSeriesReaderRequest
@@ -71,14 +72,35 @@ public class ClusterRpcReaderUtils {
return handleQueryRequest(request, peerId, 0);
}
- public static QuerySeriesDataResponse fetchBatchData(String groupID, PeerId peerId, String taskId,
+ /**
+ * Fetch batch data for select series in a query without value filter or filter series.
+ *
+ * @param groupId data group id
+ * @param peerId query node id
+ * @param taskId task id of query task
+ * @param pathType type of path
+ * @param fetchDataSeries series list which need to fetch data
+ * @param queryRounds query rounds
+ */
+ public static QuerySeriesDataResponse fetchBatchData(String groupId, PeerId peerId, String taskId,
PathType pathType, List<String> fetchDataSeries, long queryRounds)
throws RaftConnectionException {
BasicRequest request = QuerySeriesDataRequest
- .createFetchDataRequest(groupID, taskId, pathType, fetchDataSeries, queryRounds);
+ .createFetchDataRequest(groupId, taskId, pathType, fetchDataSeries, queryRounds);
return (QuerySeriesDataResponse) handleQueryRequest(request, peerId, 0);
}
+ /**
+ * Fetch batch data corresponding to a given list of timestamp for select series in a query with
+ * value filter.
+ *
+ * @param groupId data group id
+ * @param peerId query node id
+ * @param taskId task id of query task
+ * @param queryRounds query rounds
+ * @param batchTimestamp list of valid timestamp
+ * @param fetchDataSeries series list which need to fetch data
+ */
public static QuerySeriesDataByTimestampResponse fetchBatchDataByTimestamp(String groupId,
PeerId peerId, String taskId, long queryRounds, List<Long> batchTimestamp,
List<String> fetchDataSeries)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
index fc0d401..5fbd30c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
@@ -156,7 +156,7 @@ public class QueryPlanPartitionUtils {
throws PathErrorException {
AggregationPlan queryPlan = (AggregationPlan) singleQueryManager.getOriginQueryPlan();
List<Path> selectPaths = queryPlan.getPaths();
- List<String> aggregations = new ArrayList<>();
+ List<String> aggregations = queryPlan.getAggregations();
Map<String, List<Path>> selectSeriesByGroupId = singleQueryManager.getSelectSeriesByGroupId();
Map<String, List<String>> selectAggregationByGroupId = new HashMap<>();
Map<String, QueryPlan> selectPathPlans = singleQueryManager.getSelectPathPlans();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/InitSeriesReaderRequest.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/InitSeriesReaderRequest.java
index c974e2f..e28ac15 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/InitSeriesReaderRequest.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/request/querydata/InitSeriesReaderRequest.java
@@ -18,10 +18,16 @@
*/
package org.apache.iotdb.cluster.rpc.raft.request.querydata;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.iotdb.cluster.query.PathType;
import org.apache.iotdb.cluster.rpc.raft.request.BasicQueryRequest;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -42,12 +48,12 @@ public class InitSeriesReaderRequest extends BasicQueryRequest {
/**
* Key is series type, value is query plan
*/
- private Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
+ private Map<PathType, byte[]> allQueryPlan = new EnumMap<>(PathType.class);
/**
* Represent all filter of leaf node in filter tree while executing a query with value filter.
*/
- private List<Filter> filterList = new ArrayList<>();
+ private List<byte[]> filterList = new ArrayList<>();
private InitSeriesReaderRequest(String groupID, String taskId) {
@@ -55,12 +61,17 @@ public class InitSeriesReaderRequest extends BasicQueryRequest {
this.taskId = taskId;
}
- public static InitSeriesReaderRequest createInitialQueryRequest(String groupId, String taskId, int readConsistencyLevel,
- Map<PathType, QueryPlan> allQueryPlan, List<Filter> filterList){
+ public static InitSeriesReaderRequest createInitialQueryRequest(String groupId, String taskId,
+ int readConsistencyLevel,
+ Map<PathType, QueryPlan> allQueryPlan, List<Filter> filterList) throws IOException {
InitSeriesReaderRequest request = new InitSeriesReaderRequest(groupId, taskId);
request.setReadConsistencyLevel(readConsistencyLevel);
- request.allQueryPlan = allQueryPlan;
- request.filterList = filterList;
+ for (Entry<PathType, QueryPlan> entry : allQueryPlan.entrySet()) {
+ request.allQueryPlan.put(entry.getKey(), toByteArray(entry.getValue()));
+ }
+ for (Filter filter : filterList) {
+ request.filterList.add(toByteArray(filter));
+ }
return request;
}
@@ -72,20 +83,51 @@ public class InitSeriesReaderRequest extends BasicQueryRequest {
this.taskId = taskId;
}
- public Map<PathType, QueryPlan> getAllQueryPlan() {
- return allQueryPlan;
+ public Map<PathType, QueryPlan> getAllQueryPlan() throws IOException, ClassNotFoundException {
+ Map<PathType, QueryPlan> queryPlanMap = new EnumMap<>(PathType.class);
+ for (Entry<PathType, byte[]> entry : allQueryPlan.entrySet()) {
+ queryPlanMap.put(entry.getKey(), (QueryPlan) toObject(entry.getValue()));
+ }
+ return queryPlanMap;
}
- public void setAllQueryPlan(
- Map<PathType, QueryPlan> allQueryPlan) {
- this.allQueryPlan = allQueryPlan;
+ public List<Filter> getFilterList() throws IOException, ClassNotFoundException {
+ List<Filter> filters = new ArrayList<>();
+ for (byte[] filterBytes : filterList) {
+ filters.add((Filter) toObject(filterBytes));
+ }
+ return filters;
}
- public List<Filter> getFilterList() {
- return filterList;
+ /**
+ * Convert an object to byte array
+ *
+ * @param obj Object, which need to implement Serializable
+ * @return byte array of object
+ */
+ private static byte[] toByteArray(Object obj) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(obj);
+ oos.flush();
+ byte[] bytes = bos.toByteArray();
+ oos.close();
+ bos.close();
+ return bytes;
}
- public void setFilterList(List<Filter> filterList) {
- this.filterList = filterList;
+ /**
+ * Convert byte array back to Object
+ *
+ * @param bytes byte array of object
+ * @return object
+ */
+ private static Object toObject(byte[] bytes) throws IOException, ClassNotFoundException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ Object obj = ois.readObject();
+ ois.close();
+ bis.close();
+ return obj;
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/Constant.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/Constant.java
new file mode 100644
index 0000000..71cf523
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/Constant.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.integration;
+
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+
+public class Constant {
+
+ public static final String d0s0 = "root.vehicle.d0.s0";
+ public static final String d0s1 = "root.vehicle.d0.s1";
+ public static final String d0s2 = "root.vehicle.d0.s2";
+ public static final String d0s3 = "root.vehicle.d0.s3";
+ public static final String d0s4 = "root.vehicle.d0.s4";
+ public static final String d0s5 = "root.vehicle.d0.s5";
+ public static final String d1s0 = "root.vehicle.d1.s0";
+ public static final String d1s1 = "root.vehicle.d1.s1";
+ public static final String TIMESTAMP_STR = "Time";
+ public static boolean testFlag = true;
+ public static String[] stringValue = new String[]{"A", "B", "C", "D", "E"};
+ public static String[] booleanValue = new String[]{"true", "false"};
+
+ public static String[] create_sql = new String[]{"SET STORAGE GROUP TO root.vehicle",
+
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.vehicle.d0.s5 WITH DATATYPE=DOUBLE, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d1.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+
+ };
+
+ public static String insertTemplate = "insert into %s(timestamp%s) values(%d%s)";
+
+ public static String first(String path) {
+ return String.format("first(%s)", path);
+ }
+
+ public static String last(String path) {
+ return String.format("last(%s)", path);
+ }
+
+ public static String sum(String path) {
+ return String.format("sum(%s)", path);
+ }
+
+ public static String mean(String path) {
+ return String.format("mean(%s)", path);
+ }
+
+ public static String count(String path) {
+ return String.format("count(%s)", path);
+ }
+
+ public static String max_time(String path) {
+ return String.format("max_time(%s)", path);
+ }
+
+ public static String min_time(String path) {
+ return String.format("min_time(%s)", path);
+ }
+
+ public static String max_value(String path) {
+ return String.format("max_value(%s)", path);
+ }
+
+ public static String min_value(String path) {
+ return String.format("min_value(%s)", path);
+ }
+
+ public static String recordToInsert(TSRecord record) {
+ StringBuilder measurements = new StringBuilder();
+ StringBuilder values = new StringBuilder();
+ for (DataPoint dataPoint : record.dataPointList) {
+ measurements.append(",").append(dataPoint.getMeasurementId());
+ values.append(",").append(dataPoint.getValue());
+ }
+ return String
+ .format(insertTemplate, record.deviceId, measurements.toString(), record.time, values);
+ }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationIT.java
new file mode 100644
index 0000000..bf7c4da
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationIT.java
@@ -0,0 +1,640 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.integration;
+
+import static org.apache.iotdb.cluster.integration.Constant.count;
+import static org.apache.iotdb.cluster.integration.Constant.first;
+import static org.apache.iotdb.cluster.integration.Constant.last;
+import static org.apache.iotdb.cluster.integration.Constant.max_time;
+import static org.apache.iotdb.cluster.integration.Constant.max_value;
+import static org.apache.iotdb.cluster.integration.Constant.mean;
+import static org.apache.iotdb.cluster.integration.Constant.min_time;
+import static org.apache.iotdb.cluster.integration.Constant.min_value;
+import static org.apache.iotdb.cluster.integration.Constant.sum;
+import static org.apache.iotdb.cluster.utils.Utils.insertData;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.utils.EnvironmentUtils;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
+import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBAggregationIT {
+
+ private Server server;
+ private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
+ private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
+ CLUSTER_CONFIG.getPort());
+
+ private static String[] creationSqls = new String[]{
+ "SET STORAGE GROUP TO root.vehicle.d0",
+ "SET STORAGE GROUP TO root.vehicle.d1",
+
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN"
+ };
+
+ private static String[] dataSet2 = new String[]{
+ "SET STORAGE GROUP TO root.ln.wf01.wt01",
+ "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=FLOAT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(1, 1.1, false, 11)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(2, 2.2, true, 22)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(3, 3.3, false, 33 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(4, 4.4, false, 44)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+ + "values(5, 5.5, false, 55)"
+ };
+
+ private String insertTemplate = "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3)"
+ + " VALUES(%d,%d,%d,%f,%s)";
+
+ private static final String TIMESTAMP_STR = "Time";
+ private final String d0s0 = "root.vehicle.d0.s0";
+ private final String d0s1 = "root.vehicle.d0.s1";
+ private final String d0s2 = "root.vehicle.d0.s2";
+ private final String d0s3 = "root.vehicle.d0.s3";
+ private static final String TEMPERATURE_STR = "root.ln.wf01.wt01.temperature";
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.closeMemControl();
+ CLUSTER_CONFIG.createAllPath();
+ server = Server.getInstance();
+ server.start();
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ server.stop();
+ QPExecutorUtils.setLocalNodeAddr(localNode.getIp(), localNode.getPort());
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void test() throws SQLException {
+ String[] retArray = new String[]{
+ "0,2",
+ "0,4",
+ "0,3"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute(
+ "select count(temperature) from root.ln.wf01.wt01 where time > 3");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
+ resultSet.getString(count(TEMPERATURE_STR));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute(
+ "select min_time(temperature) from root.ln.wf01.wt01 where time > 3");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
+ resultSet.getString(min_time(TEMPERATURE_STR));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute(
+ "select min_time(temperature) from root.ln.wf01.wt01 where temperature > 3");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
+ resultSet.getString(min_time(TEMPERATURE_STR));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(3, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void remoteTest() throws SQLException {
+ QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+ test();
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void countTest() throws SQLException {
+ String[] retArray = new String[]{
+ "0,2001,2001,2001,2001",
+ "0,7500,7500,7500,7500"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select count(s0),count(s1),count(s2),count(s3) " +
+ "from root.vehicle.d0 where time >= 6000 and time <= 9000");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0))
+ + "," + resultSet.getString(count(d0s1)) + "," + resultSet.getString(count(d0s2))
+ + "," + resultSet.getString(count(d0s3));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select count(s0),count(s1),count(s2),count(s3) " +
+ "from root.vehicle.d0");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(count(d0s0))
+ + "," + resultSet.getString(count(d0s1)) + "," + resultSet.getString(count(d0s2))
+ + "," + resultSet.getString(count(d0s3));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void countRemoteTest() throws SQLException {
+ QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+ countTest();
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ @Test
+ public void firstTest() throws SQLException {
+ String[] retArray = new String[]{
+ "0,2000,2000,2000.0,2000",
+ "0,500,500,500.0,500"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select first(s0),first(s1),first(s2),first(s3) " +
+ "from root.vehicle.d0 where time >= 1500 and time <= 9000");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(first(d0s0))
+ + "," + resultSet.getString(first(d0s1)) + "," + resultSet.getString(first(d0s2))
+ + "," + resultSet.getString(first(d0s3));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select first(s0),first(s1),first(s2),first(s3) " +
+ "from root.vehicle.d0");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(first(d0s0))
+ + "," + resultSet.getString(first(d0s1)) + "," + resultSet.getString(first(d0s2))
+ + "," + resultSet.getString(first(d0s3));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void firstRemoteTest() throws SQLException {
+ QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+ firstTest();
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void lastTest() throws SQLException {
+ String[] retArray = new String[]{
+ "0,8499,8499.0",
+ "0,1499,1499.0",
+ "0,2200,2200.0"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select last(s0),last(s2) " +
+ "from root.vehicle.d0 where time >= 1500 and time < 9000");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0))
+ + "," + resultSet.getString(last(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select last(s0),last(s2) " +
+ "from root.vehicle.d0 where time <= 1600");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0))
+ + "," + resultSet.getString(last(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select last(s0),last(s2) " +
+ "from root.vehicle.d0 where time <= 2200");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(last(d0s0))
+ + "," + resultSet.getString(last(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(3, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void lastRemoteTest() throws SQLException {
+ QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+ lastTest();
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void maxminTimeTest() throws SQLException {
+ String[] retArray = new String[]{
+ "0,8499,500",
+ "0,2499,2000"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select max_time(s0),min_time(s2) " +
+ "from root.vehicle.d0 where time >= 100 and time < 9000");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_time(d0s0))
+ + "," + resultSet.getString(min_time(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select max_time(s0),min_time(s2) " +
+ "from root.vehicle.d0 where time <= 2500 and time > 1800");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_time(d0s0))
+ + "," + resultSet.getString(min_time(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void maxminTimeRemoteTest() throws SQLException {
+ QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+ maxminTimeTest();
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void maxminValueTest() throws SQLException {
+ String[] retArray = new String[]{
+ "0,8499,500.0",
+ "0,2499,500.0"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select max_value(s0),min_value(s2) " +
+ "from root.vehicle.d0 where time >= 100 and time < 9000");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_value(d0s0))
+ + "," + resultSet.getString(min_value(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select max_value(s0),min_value(s2) " +
+ "from root.vehicle.d0 where time < 2500");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(max_value(d0s0))
+ + "," + resultSet.getString(min_value(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void maxminValueRemoteTest() throws SQLException {
+ QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+ maxminTimeTest();
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void meanSumTest() throws SQLException {
+ String[] retArray = new String[]{
+ "0,1.4508E7,7250.374812593703",
+ "0,626750.0,1250.998003992016"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select sum(s0),mean(s2)" +
+ "from root.vehicle.d0 where time >= 6000 and time <= 9000");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(sum(d0s0))
+ + "," + resultSet.getString(mean(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select sum(s0),mean(s2)" +
+ "from root.vehicle.d0 where time >= 1000 and time <= 2000");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(sum(d0s0))
+ + "," + resultSet.getString(mean(d0s2));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ statement.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void meanSumRemoteTest() throws SQLException {
+ QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+ meanSumTest();
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void prepareData() throws SQLException {
+ Connection connection = null;
+ try {
+ connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement();
+ insertData(connection, creationSqls, dataSet2);
+ // prepare BufferWrite file
+ for (int i = 5000; i < 7000; i++) {
+ statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'"));
+ }
+ statement.executeBatch();
+ statement.clearBatch();
+ statement.execute("flush");
+ for (int i = 7500; i < 8500; i++) {
+ statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'"));
+ }
+ statement.executeBatch();
+ statement.clearBatch();
+ statement.execute("flush");
+
+ // prepare Unseq-File
+ for (int i = 500; i < 1500; i++) {
+ statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'"));
+ }
+ statement.executeBatch();
+ statement.clearBatch();
+ statement.execute("flush");
+ for (int i = 3000; i < 6500; i++) {
+ statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'"));
+ }
+ statement.executeBatch();
+ statement.clearBatch();
+ statement.execute("merge");
+
+ // prepare BufferWrite cache
+ for (int i = 9000; i < 10000; i++) {
+ statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'"));
+ }
+ statement.executeBatch();
+ statement.clearBatch();
+ // prepare Overflow cache
+ for (int i = 2000; i < 2500; i++) {
+ statement.addBatch(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'"));
+ }
+ statement.executeBatch();
+ statement.clearBatch();
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java
index f5bf17f..ba8746d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java
@@ -40,7 +40,6 @@ import org.junit.Test;
public class IoTDBFillQueryIT {
-
private Server server;
private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
private static final PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 6c458a5..401b056 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -50,18 +50,22 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+/**
+ * Handle aggregation query and construct dataset
+ */
public class AggregateEngineExecutor {
protected List<Path> selectedSeries;
protected List<String> aggres;
protected IExpression expression;
- protected List<TSDataType> dataTypes;
+ private List<TSDataType> dataTypes;
/**
* aggregation batch calculation size.
**/
- private int aggregateFetchSize;
+ protected int aggregateFetchSize;
/**
* constructor.
@@ -317,9 +321,9 @@ public class AggregateEngineExecutor {
/**
* calculation aggregate result with value filter.
*/
- private List<AggreResultData> aggregateWithTimeGenerator(
+ protected List<AggreResultData> aggregateWithTimeGenerator(
List<AggregateFunction> aggregateFunctions,
- EngineTimeGenerator timestampGenerator,
+ TimeGenerator timestampGenerator,
List<EngineReaderByTimeStamp> readersOfSelectedSeries)
throws IOException {
@@ -335,6 +339,8 @@ public class AggregateEngineExecutor {
timeArray[timeArrayLength++] = timestampGenerator.next();
}
+
+
// cal part of aggregate result
for (int i = 0; i < readersOfSelectedSeries.size(); i++) {
aggregateFunctions.get(i).calcAggregationUsingTimestamps(timeArray, timeArrayLength,
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java
index c23c2b9..5d83314 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/AbstractNodeConstructor.java
@@ -58,10 +58,7 @@ public abstract class AbstractNodeConstructor {
/**
* Construct not series type node.
-<<<<<<< HEAD
-=======
*
->>>>>>> master
* @param expression expression
* @return Node object
* @throws FileNodeManagerException FileNodeManagerException