You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/25 14:08:16 UTC
[iotdb] branch master updated: [IOTDB-2912] MPP: FilterOperator (#5645)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f7bd1ad8c3 [IOTDB-2912] MPP: FilterOperator (#5645)
f7bd1ad8c3 is described below
commit f7bd1ad8c3961fbb5d54e4a22060a745ac2ec9b0
Author: flashzxi <39...@users.noreply.github.com>
AuthorDate: Mon Apr 25 22:08:10 2022 +0800
[IOTDB-2912] MPP: FilterOperator (#5645)
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
.../db/mpp/operator/process/FilterOperator.java | 155 +++++++++++++++++++++
.../db/mpp/operator/process/TransformOperator.java | 125 +++++++++--------
2 files changed, 224 insertions(+), 56 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterOperator.java
new file mode 100644
index 0000000000..1421bbcac6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterOperator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.process;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FilterOperator extends TransformOperator {
+
+ private LayerPointReader filterPointReader;
+
+ public FilterOperator(
+ OperatorContext operatorContext,
+ Operator inputOperator,
+ List<TSDataType> inputDataTypes,
+ Expression filterExpression,
+ Expression[] outputExpressions,
+ UDTFContext udtfContext)
+ throws QueryProcessException, IOException {
+ super(
+ operatorContext,
+ inputOperator,
+ inputDataTypes,
+ bindExpressions(filterExpression, outputExpressions),
+ udtfContext,
+ false);
+ }
+
+ private static Expression[] bindExpressions(
+ Expression filterExpression, Expression[] outputExpressions) {
+ Expression[] expressions = new Expression[outputExpressions.length + 1];
+ System.arraycopy(outputExpressions, 0, expressions, 0, outputExpressions.length);
+ expressions[expressions.length - 1] = filterExpression;
+ return expressions;
+ }
+
+ @Override
+ protected void initTransformers() throws QueryProcessException, IOException {
+ super.initTransformers();
+
+ filterPointReader = transformers[transformers.length - 1];
+ if (filterPointReader.getDataType() != TSDataType.BOOLEAN) {
+ throw new UnSupportedDataTypeException(
+ String.format(
+ "Data type of the filter expression should be BOOLEAN, but %s is received.",
+ filterPointReader.getDataType()));
+ }
+ }
+
+ @Override
+ protected void initLayerPointReaders() throws QueryProcessException, IOException {
+ iterateFilterReaderToNextValid();
+ }
+
+ private void iterateFilterReaderToNextValid() throws QueryProcessException, IOException {
+ while (filterPointReader.next()
+ && (filterPointReader.isCurrentNull() || !filterPointReader.currentBoolean())) {
+ filterPointReader.readyForNext();
+ }
+ }
+
+ @Override
+ public TsBlock next() {
+ final TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
+
+ final int outputColumnCount = transformers.length - 1;
+
+ if (outputDataTypes == null) {
+ outputDataTypes = new ArrayList<>();
+ for (int i = 0; i < outputColumnCount; ++i) {
+ outputDataTypes.add(transformers[i].getDataType());
+ }
+ }
+ tsBlockBuilder.buildValueColumnBuilders(outputDataTypes);
+
+ final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+
+ try {
+ int rowCount = 0;
+ while (rowCount < FETCH_SIZE && filterPointReader.next()) {
+ final long currentTime = filterPointReader.currentTime();
+
+ boolean hasAtLeastOneValid = false;
+ for (int i = 0; i < outputColumnCount; ++i) {
+ if (currentTime == iterateValueReadersToNextValid(transformers[i], currentTime)) {
+ hasAtLeastOneValid = true;
+ }
+ }
+
+ if (hasAtLeastOneValid) {
+ timeBuilder.writeLong(currentTime);
+ for (int i = 0; i < outputColumnCount; ++i) {
+ collectDataPoint(transformers[i], columnBuilders[i], currentTime);
+ }
+ ++rowCount;
+ }
+
+ iterateFilterReaderToNextValid();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return tsBlockBuilder.build();
+ }
+
+ private long iterateValueReadersToNextValid(LayerPointReader reader, long currentTime)
+ throws QueryProcessException, IOException {
+ while (reader.next() && (reader.isCurrentNull() || reader.currentTime() < currentTime)) {
+ reader.readyForNext();
+ }
+ return reader.currentTime();
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ return filterPointReader.next();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
index cd3355d395..f74e17acc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
@@ -43,29 +43,31 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
public class TransformOperator implements ProcessOperator {
- private static final int FETCH_SIZE = 10000;
+ protected static final int FETCH_SIZE = 10000;
- private final float udfReaderMemoryBudgetInMB =
+ protected final float udfReaderMemoryBudgetInMB =
IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
- private final float udfTransformerMemoryBudgetInMB =
+ protected final float udfTransformerMemoryBudgetInMB =
IoTDBDescriptor.getInstance().getConfig().getUdfTransformerMemoryBudgetInMB();
- private final float udfCollectorMemoryBudgetInMB =
+ protected final float udfCollectorMemoryBudgetInMB =
IoTDBDescriptor.getInstance().getConfig().getUdfCollectorMemoryBudgetInMB();
- private final OperatorContext operatorContext;
- private final Operator inputOperator;
- private final List<TSDataType> inputDataTypes;
- private final Expression[] outputExpressions;
- private final UDTFContext udtfContext;
- private final boolean keepNull;
+ protected final OperatorContext operatorContext;
+ protected final Operator inputOperator;
+ protected final List<TSDataType> inputDataTypes;
+ protected final Expression[] outputExpressions;
+ protected final UDTFContext udtfContext;
+ protected final boolean keepNull;
- private IUDFInputDataSet inputDataset;
- private LayerPointReader[] transformers;
- private TimeSelector timeHeap;
+ protected IUDFInputDataSet inputDataset;
+ protected LayerPointReader[] transformers;
+ protected TimeSelector timeHeap;
+ protected List<TSDataType> outputDataTypes;
public TransformOperator(
OperatorContext operatorContext,
@@ -84,18 +86,18 @@ public class TransformOperator implements ProcessOperator {
initInputDataset(inputDataTypes);
initTransformers();
- initTimeHeap();
+ initLayerPointReaders();
}
private void initInputDataset(List<TSDataType> inputDataTypes) {
inputDataset = new TsBlockInputDataSet(inputOperator, inputDataTypes);
}
- private void initTransformers() throws QueryProcessException, IOException {
+ protected void initTransformers() throws QueryProcessException, IOException {
UDFRegistrationService.getInstance().acquireRegistrationLock();
- // This statement must be surrounded by the registration lock.
- UDFClassLoaderManager.getInstance().initializeUDFQuery(operatorContext.getOperatorId());
try {
+ // This statement must be surrounded by the registration lock.
+ UDFClassLoaderManager.getInstance().initializeUDFQuery(operatorContext.getOperatorId());
// UDF executors will be initialized at the same time
transformers =
new EvaluationDAGBuilder(
@@ -113,7 +115,7 @@ public class TransformOperator implements ProcessOperator {
}
}
- private void initTimeHeap() throws QueryProcessException, IOException {
+ protected void initLayerPointReaders() throws QueryProcessException, IOException {
timeHeap = new TimeSelector(transformers.length << 1, true);
for (LayerPointReader reader : transformers) {
iterateReaderToNextValid(reader);
@@ -143,67 +145,78 @@ public class TransformOperator implements ProcessOperator {
@Override
public TsBlock next() {
final TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
- tsBlockBuilder.buildValueColumnBuilders(inputDataTypes);
+
+ if (outputDataTypes == null) {
+ outputDataTypes = new ArrayList<>();
+ for (LayerPointReader reader : transformers) {
+ outputDataTypes.add(reader.getDataType());
+ }
+ }
+ tsBlockBuilder.buildValueColumnBuilders(outputDataTypes);
final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
final int columnCount = columnBuilders.length;
- int rowCount = 0;
try {
+ int rowCount = 0;
while (rowCount < FETCH_SIZE && !timeHeap.isEmpty()) {
- long minTime = timeHeap.pollFirst();
+ final long currentTime = timeHeap.pollFirst();
- timeBuilder.writeLong(minTime);
+ // time
+ timeBuilder.writeLong(currentTime);
+ // values
for (int i = 0; i < columnCount; ++i) {
LayerPointReader reader = transformers[i];
-
- if (!reader.next() || reader.currentTime() != minTime || reader.isCurrentNull()) {
- columnBuilders[i].appendNull();
- continue;
- }
-
- TSDataType type = reader.getDataType();
- switch (type) {
- case INT32:
- columnBuilders[i].writeInt(reader.currentInt());
- break;
- case INT64:
- columnBuilders[i].writeLong(reader.currentLong());
- break;
- case FLOAT:
- columnBuilders[i].writeFloat(reader.currentFloat());
- break;
- case DOUBLE:
- columnBuilders[i].writeDouble(reader.currentDouble());
- break;
- case BOOLEAN:
- columnBuilders[i].writeBoolean(reader.currentBoolean());
- break;
- case TEXT:
- columnBuilders[i].writeBinary(reader.currentBinary());
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", type));
- }
-
- reader.readyForNext();
-
+ collectDataPoint(reader, columnBuilders[i], currentTime);
iterateReaderToNextValid(reader);
}
++rowCount;
}
} catch (Exception e) {
- // TODO: throw here?
throw new RuntimeException(e);
}
return tsBlockBuilder.build();
}
+ protected void collectDataPoint(LayerPointReader reader, ColumnBuilder writer, long currentTime)
+ throws QueryProcessException, IOException {
+ if (!reader.next() || reader.currentTime() != currentTime || reader.isCurrentNull()) {
+ writer.appendNull();
+ return;
+ }
+
+ TSDataType type = reader.getDataType();
+ switch (type) {
+ case INT32:
+ writer.writeInt(reader.currentInt());
+ break;
+ case INT64:
+ writer.writeLong(reader.currentLong());
+ break;
+ case FLOAT:
+ writer.writeFloat(reader.currentFloat());
+ break;
+ case DOUBLE:
+ writer.writeDouble(reader.currentDouble());
+ break;
+ case BOOLEAN:
+ writer.writeBoolean(reader.currentBoolean());
+ break;
+ case TEXT:
+ writer.writeBinary(reader.currentBinary());
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", type));
+ }
+
+ reader.readyForNext();
+ }
+
@Override
public void close() throws Exception {
udtfContext.finalizeUDFExecutors(operatorContext.getOperatorId());