You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/23 04:35:41 UTC
[iotdb] 01/01: Operators related to last query
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 16e19022380361bf456be11f4e5d0e4643497e92
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 12:35:22 2022 +0800
Operators related to last query
---
.../operator/process/LastQueryMergeOperator.java | 77 ++++++++++++
.../operator/process/UpdateLastCacheOperator.java | 133 +++++++++++++++++++++
.../operator/source/LastCacheScanOperator.java | 64 ++++++++++
3 files changed, 274 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
new file mode 100644
index 0000000000..c7e678c884
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+public class LastQueryMergeOperator implements ProcessOperator {
+
+ private final OperatorContext operatorContext;
+
+ private final List<Operator> children;
+
+ private final int inputOperatorsCount;
+
+ private int currentIndex;
+
+ public LastQueryMergeOperator(OperatorContext operatorContext, List<Operator> children) {
+ this.operatorContext = operatorContext;
+ this.children = children;
+ this.inputOperatorsCount = children.size();
+ this.currentIndex = 0;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return children.get(currentIndex).isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ return children.get(currentIndex++).next();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentIndex < inputOperatorsCount && children.get(currentIndex).hasNext();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return currentIndex >= inputOperatorsCount;
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Operator child : children) {
+ child.close();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
new file mode 100644
index 0000000000..1daab0f0f9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class UpdateLastCacheOperator implements ProcessOperator {
+
+ private static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
+ new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT))
+ .build();
+
+ private final OperatorContext operatorContext;
+
+ private final Operator child;
+
+ // fullPath for queried time series
+ private final PartialPath fullPath;
+
+ // dataType for queried time series;
+ private final String dataType;
+
+ private final DataNodeSchemaCache lastCache;
+
+ private final boolean needUpdateCache;
+
+ private final TsBlockBuilder tsBlockBuilder;
+
+ public UpdateLastCacheOperator(
+ OperatorContext operatorContext,
+ Operator child,
+ PartialPath fullPath,
+ TSDataType dataType,
+ DataNodeSchemaCache dataNodeSchemaCache,
+ boolean needUpdateCache) {
+ this.operatorContext = operatorContext;
+ this.child = child;
+ this.fullPath = fullPath;
+ this.dataType = dataType.name();
+ this.lastCache = dataNodeSchemaCache;
+ this.needUpdateCache = needUpdateCache;
+ this.tsBlockBuilder =
+ new TsBlockBuilder(1, ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return child.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlock res = child.next();
+ if (res == null) {
+ return null;
+ }
+ if (res.isEmpty()) {
+ return LAST_QUERY_EMPTY_TSBLOCK;
+ }
+
+ checkArgument(res.getPositionCount() == 1, "last query result should only have one record");
+
+ long lastTime = res.getColumn(0).getLong(0);
+ TsPrimitiveType lastValue = res.getColumn(1).getTsPrimitiveType(0);
+
+ if (needUpdateCache) {
+ TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
+ lastCache.updateLastCache(fullPath, timeValuePair, false, Long.MIN_VALUE);
+ }
+
+ tsBlockBuilder.reset();
+ // Time
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(lastTime);
+ // timeseries
+ tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(fullPath.getFullPath()));
+ // value
+ tsBlockBuilder.getColumnBuilder(1).writeBinary(new Binary(lastValue.getStringValue()));
+ // dataType
+ tsBlockBuilder.getColumnBuilder(2).writeBinary(new Binary(dataType));
+
+ return tsBlockBuilder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return child.hasNext();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return child.isFinished();
+ }
+
+ @Override
+ public void close() throws Exception {
+ child.close();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
new file mode 100644
index 0000000000..dfb6d82c5c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public class LastCacheScanOperator implements SourceOperator {
+
+ private final OperatorContext operatorContext;
+ private final PlanNodeId sourceId;
+ private TsBlock tsBlock;
+
+ public LastCacheScanOperator(
+ OperatorContext operatorContext, PlanNodeId sourceId, TsBlock tsBlock) {
+ this.operatorContext = operatorContext;
+ this.sourceId = sourceId;
+ this.tsBlock = tsBlock;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlock res = tsBlock;
+ tsBlock = null;
+ return res;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return tsBlock != null && !tsBlock.isEmpty();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return !hasNext();
+ }
+
+ @Override
+ public PlanNodeId getSourceId() {
+ return sourceId;
+ }
+}