You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/12/01 01:04:41 UTC
[iotdb] branch master updated: [IOTDB-1940] Support aggregation query with value filter in new vector (#4470)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f43085e [IOTDB-1940] Support aggregation query with value filter in new vector (#4470)
f43085e is described below
commit f43085ee9c583e319cc621177aa3e95f14502715
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Dec 1 09:04:08 2021 +0800
[IOTDB-1940] Support aggregation query with value filter in new vector (#4470)
---
.../db/query/aggregation/AggregateResult.java | 4 +-
.../db/query/aggregation/impl/AvgAggrResult.java | 9 +-
.../db/query/aggregation/impl/CountAggrResult.java | 11 +-
.../query/aggregation/impl/ExtremeAggrResult.java | 7 +-
.../aggregation/impl/FirstValueAggrResult.java | 11 +-
.../aggregation/impl/FirstValueDescAggrResult.java | 7 +-
.../aggregation/impl/LastValueAggrResult.java | 7 +-
.../aggregation/impl/LastValueDescAggrResult.java | 12 +-
.../query/aggregation/impl/MaxTimeAggrResult.java | 5 +-
.../aggregation/impl/MaxTimeDescAggrResult.java | 10 +-
.../query/aggregation/impl/MaxValueAggrResult.java | 10 +-
.../query/aggregation/impl/MinTimeAggrResult.java | 10 +-
.../aggregation/impl/MinTimeDescAggrResult.java | 5 +-
.../query/aggregation/impl/MinValueAggrResult.java | 10 +-
.../db/query/aggregation/impl/SumAggrResult.java | 9 +-
.../db/query/executor/AggregationExecutor.java | 116 ++++++---
.../iotdb/db/utils/AlignedValueIterator.java | 58 +++++
.../org/apache/iotdb/db/utils/ValueIterator.java | 56 +++++
.../IoTDBAggregationWithValueFilter2IT.java | 63 +++++
.../aligned/IoTDBAggregationWithValueFilterIT.java | 262 +++++++++++++++++++++
...BAggregationWithValueFilterWithDeletion2IT.java | 83 +++++++
...DBAggregationWithValueFilterWithDeletionIT.java | 219 +++++++++++++++++
22 files changed, 886 insertions(+), 98 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index efd9aee..7daa1e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -98,7 +99,8 @@ public abstract class AggregateResult {
long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException;
/** This method calculates the aggregation using values that have been calculated */
- public abstract void updateResultUsingValues(long[] timestamps, int length, Object[] values);
+ public abstract void updateResultUsingValues(
+ long[] timestamps, int length, ValueIterator valueIterator);
/**
* Judge if aggregation results have been calculated. In other words, if the aggregated result
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index a1fffa6..aebe259 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -110,11 +111,9 @@ public class AvgAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- updateAvg(seriesDataType, values[i]);
- }
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+ while (valueIterator.hasNext()) {
+ updateAvg(seriesDataType, valueIterator.next());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 57a3a7e..971c76a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -57,7 +58,6 @@ public class CountAggrResult extends AggregateResult {
public void updateResultFromPageData(
IBatchDataIterator batchIterator, long minBound, long maxBound) {
int cnt = 0;
- int count = batchIterator.totalLength();
while (batchIterator.hasNext()) {
if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) {
break;
@@ -82,12 +82,11 @@ public class CountAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
int cnt = 0;
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- cnt++;
- }
+ while (valueIterator.hasNext()) {
+ valueIterator.next();
+ cnt++;
}
setLongValue(getLongValue() + cnt);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
index 144fe00..0a0c165 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
@@ -125,10 +126,10 @@ public class ExtremeAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
Comparable<Object> extVal = null;
- for (int i = 0; i < length; i++) {
- extVal = getExtremeValue(extVal, (Comparable<Object>) values[i]);
+ while (valueIterator.hasNext()) {
+ extVal = getExtremeValue(extVal, (Comparable<Object>) valueIterator.next());
}
updateResult(extVal);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 233abdc..be5bd9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -107,16 +108,12 @@ public class FirstValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
if (hasFinalResult()) {
return;
}
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- setValue(values[i]);
- timestamp = timestamps[i];
- break;
- }
+ if (valueIterator.hasNext()) {
+ setValue(valueIterator.next());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index 3092818..afcd23f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -65,10 +66,10 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
for (int i = length - 1; i >= 0; i--) {
- if (values[i] != null) {
- setValue(values[i]);
+ if (valueIterator.get(i) != null) {
+ setValue(valueIterator.get(i));
timestamp = timestamps[i];
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 443751c..d369f79 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -97,11 +98,11 @@ public class LastValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
for (int i = length - 1; i >= 0; i--) {
- if (values[i] != null) {
+ if (valueIterator.get(i) != null) {
timestamp = timestamps[i];
- setValue(values[i]);
+ setValue(valueIterator.get(i));
return;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 981167b..7bc236c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -87,16 +88,13 @@ public class LastValueDescAggrResult extends LastValueAggrResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
if (hasFinalResult()) {
return;
}
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- timestamp = timestamps[i];
- setValue(values[i]);
- return;
- }
+ if (valueIterator.hasNext()) {
+ timestamp = timestamps[valueIterator.getCurPos()];
+ setValue(valueIterator.next());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 46ebe15..d990b8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -77,9 +78,9 @@ public class MaxTimeAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
for (int i = length - 1; i >= 0; i--) {
- if (values[i] != null) {
+ if (valueIterator.get(i) != null) {
updateMaxTimeResult(timestamps[i]);
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index e867bf7..1dfc1ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -70,15 +71,12 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
if (hasFinalResult()) {
return;
}
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- updateMaxTimeResult(timestamps[i]);
- return;
- }
+ if (valueIterator.hasNext()) {
+ updateMaxTimeResult(timestamps[valueIterator.getCurPos()]);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index a61583b..f939d9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -83,11 +84,12 @@ public class MaxValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
Comparable<Object> maxVal = null;
- for (int i = 0; i < length; i++) {
- if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) {
- maxVal = (Comparable<Object>) values[i];
+ while (valueIterator.hasNext()) {
+ Object value = valueIterator.next();
+ if (maxVal == null || maxVal.compareTo(value) < 0) {
+ maxVal = (Comparable<Object>) value;
}
}
updateResult(maxVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index 4d0365f..0cf2205 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -92,15 +93,12 @@ public class MinTimeAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
if (hasFinalResult()) {
return;
}
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- setLongValue(timestamps[i]);
- return;
- }
+ if (valueIterator.hasNext()) {
+ setLongValue(timestamps[valueIterator.getCurPos()]);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 071d3d2..6fe0fdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -54,9 +55,9 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
for (int i = length - 1; i >= 0; i--) {
- if (values[i] != null) {
+ if (valueIterator.get(i) != null) {
setLongValue(timestamps[i]);
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index f3c01ed..8b9e1e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
@@ -78,11 +79,12 @@ public class MinValueAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
Comparable<Object> minVal = null;
- for (int i = 0; i < length; i++) {
- if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) {
- minVal = (Comparable<Object>) values[i];
+ while (valueIterator.hasNext()) {
+ Object value = valueIterator.next();
+ if (minVal == null || minVal.compareTo(value) > 0) {
+ minVal = (Comparable<Object>) value;
}
}
updateResult(minVal);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index a047c33..46cd1b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
@@ -91,11 +92,9 @@ public class SumAggrResult extends AggregateResult {
}
@Override
- public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
- for (int i = 0; i < length; i++) {
- if (values[i] != null) {
- updateSum(values[i]);
- }
+ public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) {
+ while (valueIterator.hasNext()) {
+ updateSum(valueIterator.next());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 53c218a..1b10d53 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -44,7 +44,9 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.db.utils.AlignedValueIterator;
import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -55,10 +57,12 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -82,7 +86,10 @@ public class AggregationExecutor {
private int aggregateFetchSize;
protected AggregationExecutor(QueryContext context, AggregationPlan aggregationPlan) {
- this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
+ this.selectedSeries = new ArrayList<>();
+ aggregationPlan
+ .getDeduplicatedPaths()
+ .forEach(k -> selectedSeries.add(((MeasurementPath) k).transformToExactPath()));
this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
this.aggregations = aggregationPlan.getDeduplicatedAggregations();
this.expression = aggregationPlan.getExpression();
@@ -588,18 +595,26 @@ public class AggregationExecutor {
// group by path name
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
groupAggregationsBySeries(selectedSeries);
- Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>();
+ Map<PartialPath, List<List<Integer>>> vectorPathIndexesMap =
+ groupVectorSeries(pathToAggrIndexesMap);
+ Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap = new HashMap<>();
List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
+
try {
- for (int i = 0; i < selectedSeries.size(); i++) {
- PartialPath path = selectedSeries.get(i);
- List<Integer> indexes = pathToAggrIndexesMap.remove(path);
- if (indexes != null) {
- IReaderByTimestamp seriesReaderByTimestamp =
- getReaderByTime(path, queryPlan, dataTypes.get(i), context);
- readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
- }
+ for (PartialPath path : pathToAggrIndexesMap.keySet()) {
+ IReaderByTimestamp seriesReaderByTimestamp =
+ getReaderByTime(path, queryPlan, path.getSeriesType(), context);
+ readerToAggrIndexesMap.put(
+ seriesReaderByTimestamp, Collections.singletonList(pathToAggrIndexesMap.get(path)));
+ }
+ // assign null to be friendly for GC
+ pathToAggrIndexesMap = null;
+ for (PartialPath vectorPath : vectorPathIndexesMap.keySet()) {
+ IReaderByTimestamp seriesReaderByTimestamp =
+ getReaderByTime(vectorPath, queryPlan, vectorPath.getSeriesType(), context);
+ readerToAggrIndexesMap.put(seriesReaderByTimestamp, vectorPathIndexesMap.get(vectorPath));
}
+ vectorPathIndexesMap = null;
} finally {
StorageEngine.getInstance().mergeUnLock(list);
}
@@ -649,7 +664,7 @@ public class AggregationExecutor {
/** calculate aggregation result with value filter. */
private void aggregateWithValueFilter(
TimeGenerator timestampGenerator,
- Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
+ Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap)
throws IOException {
List<Boolean> cached =
markFilterdPaths(
@@ -668,23 +683,40 @@ public class AggregationExecutor {
}
// cal part of aggregate result
- for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) {
- int pathId = entry.getValue().get(0);
- // cache in timeGenerator
- if (cached.get(pathId)) {
- Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
- for (Integer i : entry.getValue()) {
- aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
+ for (Entry<IReaderByTimestamp, List<List<Integer>>> entry :
+ readerToAggrIndexesMap.entrySet()) {
+ // use cache data as much as possible
+ boolean[] cachedOrNot = new boolean[entry.getValue().size()];
+ for (int i = 0; i < entry.getValue().size(); i++) {
+ List<Integer> subIndexes = entry.getValue().get(i);
+ int pathId = subIndexes.get(0);
+ // if cached in timeGenerator
+ if (cached.get(pathId)) {
+ Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
+ ValueIterator valueIterator = generateValueIterator(values);
+ for (Integer index : subIndexes) {
+ aggregateResultList[index].updateResultUsingValues(
+ timeArray, timeArrayLength, valueIterator);
+ valueIterator.reset();
+ }
+ cachedOrNot[i] = true;
}
- } else {
- if (entry.getValue().size() == 1) {
- aggregateResultList[entry.getValue().get(0)].updateResultUsingTimestamps(
- timeArray, timeArrayLength, entry.getKey());
- } else {
- Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
- if (values != null) {
- for (Integer i : entry.getValue()) {
- aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values);
+ }
+
+ if (hasRemaining(cachedOrNot)) {
+ // TODO: if we only need to get firstValue/minTime that's not need to traverse all values,
+ // it's enough to get the exact number of values for these specific aggregate func
+ Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
+ if (values != null) {
+ ValueIterator valueIterator = generateValueIterator(values);
+ for (int i = 0; i < entry.getValue().size(); i++) {
+ if (!cachedOrNot[i]) {
+ valueIterator.setSubMeasurementIndex(i);
+ for (Integer index : entry.getValue().get(i)) {
+ aggregateResultList[index].updateResultUsingValues(
+ timeArray, timeArrayLength, valueIterator);
+ valueIterator.reset();
+ }
}
}
}
@@ -693,6 +725,24 @@ public class AggregationExecutor {
}
}
+ private ValueIterator generateValueIterator(Object[] values) {
+ if (values[0] instanceof TsPrimitiveType[]) {
+ return new AlignedValueIterator(values);
+ } else {
+ return new ValueIterator(values);
+ }
+ }
+
+ /** Return whether there is result that has not been cached */
+ private boolean hasRemaining(boolean[] cachedOrNot) {
+ for (int i = 0; i < cachedOrNot.length; i++) {
+ if (!cachedOrNot[i]) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* using aggregate result data list construct QueryDataSet.
*
@@ -755,21 +805,19 @@ public class AggregationExecutor {
List<PartialPath> seriesPaths = new ArrayList<>(pathToAggrIndexesMap.keySet());
for (PartialPath seriesPath : seriesPaths) {
- if (seriesPath instanceof MeasurementPath
- && ((MeasurementPath) seriesPath).isUnderAlignedEntity()) {
+ if (seriesPath instanceof AlignedPath) {
List<Integer> indexes = pathToAggrIndexesMap.remove(seriesPath);
- AlignedPath exactPath = (AlignedPath) ((MeasurementPath) seriesPath).transformToExactPath();
- AlignedPath groupPath = temp.get(exactPath.getFullPath());
+ AlignedPath groupPath = temp.get(seriesPath.getFullPath());
if (groupPath == null) {
- groupPath = exactPath;
+ groupPath = (AlignedPath) seriesPath;
temp.put(groupPath.getFullPath(), groupPath);
result.computeIfAbsent(groupPath, key -> new ArrayList<>()).add(indexes);
} else {
// groupPath is changed here so we update it
List<List<Integer>> subIndexes = result.remove(groupPath);
subIndexes.add(indexes);
- groupPath.addMeasurements(exactPath.getMeasurementList());
- groupPath.addSchemas(exactPath.getSchemaList());
+ groupPath.addMeasurements(((AlignedPath) seriesPath).getMeasurementList());
+ groupPath.addSchemas(((AlignedPath) seriesPath).getSchemaList());
result.put(groupPath, subIndexes);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
new file mode 100644
index 0000000..5d34d4d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+/** Used for value object is instance of TsPrimitiveType[] */
+public class AlignedValueIterator extends ValueIterator {
+
+ int subMeasurementIndex;
+
+ public AlignedValueIterator(Object[] values) {
+ super(values);
+ }
+
+ public void setSubMeasurementIndex(int subMeasurementIndex) {
+ this.subMeasurementIndex = subMeasurementIndex;
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (curPos < values.length
+ && ((TsPrimitiveType[]) values[curPos])[subMeasurementIndex] == null) {
+ curPos++;
+ }
+ return curPos < values.length;
+ }
+
+ @Override
+ public Object next() {
+ return ((TsPrimitiveType[]) values[curPos++])[subMeasurementIndex].getValue();
+ }
+
+ @Override
+ public Object get(int index) {
+ if (((TsPrimitiveType[]) values[index])[subMeasurementIndex] == null) {
+ return null;
+ }
+ return ((TsPrimitiveType[]) values[index])[subMeasurementIndex].getValue();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java
new file mode 100644
index 0000000..d870f7f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+public class ValueIterator {
+
+ // Object: TsPrimitiveType[] or common data type
+ protected Object[] values;
+ protected int curPos = 0;
+
+ public ValueIterator(Object[] values) {
+ this.values = values;
+ }
+
+ public boolean hasNext() {
+ while (curPos < values.length && values[curPos] == null) {
+ curPos++;
+ }
+ return curPos < values.length;
+ }
+
+ public void setSubMeasurementIndex(int subMeasurementIndex) {}
+
+ public Object next() {
+ return values[curPos++];
+ }
+
+ public Object get(int index) {
+ return values[index];
+ }
+
+ public int getCurPos() {
+ return curPos;
+ }
+
+ public void reset() {
+ this.curPos = 0;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilter2IT.java b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilter2IT.java
new file mode 100644
index 0000000..40986d9
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilter2IT.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/** Let One chunk has more than one page. */
+public class IoTDBAggregationWithValueFilter2IT extends IoTDBAggregationWithValueFilterIT {
+ private static int numOfPointsPerPage;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ // TODO When the aligned time series support compaction, we need to set compaction to true
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ numOfPointsPerPage = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(3);
+ AlignedWriteUtil.insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+ EnvironmentUtils.cleanEnv();
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterIT.java
new file mode 100644
index 0000000..75f7882
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterIT.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class IoTDBAggregationWithValueFilterIT {
+
+ private static final double DELTA = 1e-6;
+ protected static boolean enableSeqSpaceCompaction;
+ protected static boolean enableUnseqSpaceCompaction;
+ protected static boolean enableCrossSpaceCompaction;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ // TODO When the aligned time series support compaction, we need to set compaction to true
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+ AlignedWriteUtil.insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void countAlignedWithValueFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"11"};
+ String[] columnNames = {"count(root.sg1.d1.s4)"};
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("select count(s4) from root.sg1.d1 where s4 = true");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void aggregationFuncAlignedWithValueFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"8", "42.0", "5.25", "1.0", "9.0", "1", "9", "1.0", "9.0"};
+ String[] columnNames = {
+ "count(root.sg1.d1.s1)",
+ "sum(root.sg1.d1.s1)",
+ "avg(root.sg1.d1.s1)",
+ "first_value(root.sg1.d1.s1)",
+ "last_value(root.sg1.d1.s1)",
+ "min_time(root.sg1.d1.s1)",
+ "max_time(root.sg1.d1.s1)",
+ "min_value(root.sg1.d1.s1)",
+ "max_value(root.sg1.d1.s1)",
+ };
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), sum(s1), avg(s1), "
+ + "first_value(s1), last_value(s1), "
+ + "min_time(s1), max_time(s1),"
+ + "max_value(s1), min_value(s1) from root.sg1.d1 where s1 < 10");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void countAllAlignedWithValueFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"6", "6", "9", "11", "6"};
+ String[] columnNames = {
+ "count(root.sg1.d1.s1)",
+ "count(root.sg1.d1.s2)",
+ "count(root.sg1.d1.s3)",
+ "count(root.sg1.d1.s4)",
+ "count(root.sg1.d1.s5)"
+ };
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("select count(*) from root.sg1.d1 where s4 = true");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void aggregationAllAlignedWithValueFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"160016.0", "11", "1", "13"};
+ String[] columnNames = {
+ "sum(root.sg1.d1.s1)",
+ "count(root.sg1.d1.s4)",
+ "min_value(root.sg1.d1.s3)",
+ "max_time(root.sg1.d1.s2)",
+ };
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select sum(s1), count(s4), min_value(s3), max_time(s2) from root.sg1.d1 where s4 = true");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterWithDeletion2IT.java b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterWithDeletion2IT.java
new file mode 100644
index 0000000..85ddd6b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterWithDeletion2IT.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+/** Let One chunk has more than one page. */
+public class IoTDBAggregationWithValueFilterWithDeletion2IT
+ extends IoTDBAggregationWithValueFilterWithDeletionIT {
+
+ private static int numOfPointsPerPage;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ numOfPointsPerPage = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(3);
+ AlignedWriteUtil.insertData();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ // TODO currently aligned data in memory doesn't support deletion, so we flush all data to
+ // disk before doing deletion
+ statement.execute("flush");
+ statement.execute("delete timeseries root.sg1.d1.s2");
+ statement.execute("delete from root.sg1.d1.s1 where time <= 21");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+ EnvironmentUtils.cleanEnv();
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterWithDeletionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterWithDeletionIT.java
new file mode 100644
index 0000000..fb9fff0
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithValueFilterWithDeletionIT.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class IoTDBAggregationWithValueFilterWithDeletionIT {
+
+ private static final double DELTA = 1e-6;
+ protected static boolean enableSeqSpaceCompaction;
+ protected static boolean enableUnseqSpaceCompaction;
+ protected static boolean enableCrossSpaceCompaction;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+
+ enableSeqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
+ IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+
+ AlignedWriteUtil.insertData();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ // TODO currently aligned data in memory doesn't support deletion, so we flush all data to
+ // disk before doing deletion
+ statement.execute("flush");
+ statement.execute("delete timeseries root.sg1.d1.s2");
+ statement.execute("delete from root.sg1.d1.s1 where time <= 21");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void countAlignedWithValueFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"11"};
+ String[] columnNames = {"count(root.sg1.d1.s4)"};
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("select count(s4) from root.sg1.d1 where s4 = true");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void countAllAlignedWithValueFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"0", "9", "11", "6"};
+ String[] columnNames = {
+ "count(root.sg1.d1.s1)",
+ "count(root.sg1.d1.s3)",
+ "count(root.sg1.d1.s4)",
+ "count(root.sg1.d1.s5)"
+ };
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("select count(*) from root.sg1.d1 where s4 = true");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void aggregationAllAlignedWithValueFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"0", "25", "1"};
+ String[] columnNames = {
+ "count(root.sg1.d1.s1)", "max_time(root.sg1.d1.s4)", "min_value(root.sg1.d1.s3)",
+ };
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select count(s1), max_time(s4), min_value(s3) from root.sg1.d1 where s4 = true");
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ map.put(resultSetMetaData.getColumnName(i), i);
+ }
+ assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] ans = new String[columnNames.length];
+ // No need to add time column for aggregation query
+ for (int i = 0; i < columnNames.length; i++) {
+ String columnName = columnNames[i];
+ int index = map.get(columnName);
+ ans[i] = resultSet.getString(index);
+ }
+ assertArrayEquals(retArray, ans);
+ cnt++;
+ }
+ assertEquals(1, cnt);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}