You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/23 04:35:41 UTC

[iotdb] 01/01: Operators related to last query

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 16e19022380361bf456be11f4e5d0e4643497e92
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 12:35:22 2022 +0800

    Operators related to last query
---
 .../operator/process/LastQueryMergeOperator.java   |  77 ++++++++++++
 .../operator/process/UpdateLastCacheOperator.java  | 133 +++++++++++++++++++++
 .../operator/source/LastCacheScanOperator.java     |  64 ++++++++++
 3 files changed, 274 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
new file mode 100644
index 0000000000..c7e678c884
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+public class LastQueryMergeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+
+  private final List<Operator> children;
+
+  private final int inputOperatorsCount;
+
+  private int currentIndex;
+
+  public LastQueryMergeOperator(OperatorContext operatorContext, List<Operator> children) {
+    this.operatorContext = operatorContext;
+    this.children = children;
+    this.inputOperatorsCount = children.size();
+    this.currentIndex = 0;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return children.get(currentIndex).isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    return children.get(currentIndex++).next();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return currentIndex < inputOperatorsCount && children.get(currentIndex).hasNext();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return currentIndex >= inputOperatorsCount;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator child : children) {
+      child.close();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
new file mode 100644
index 0000000000..1daab0f0f9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class UpdateLastCacheOperator implements ProcessOperator {
+
+  private static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
+      new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT))
+          .build();
+
+  private final OperatorContext operatorContext;
+
+  private final Operator child;
+
+  // fullPath for queried time series
+  private final PartialPath fullPath;
+
+  // dataType for queried time series;
+  private final String dataType;
+
+  private final DataNodeSchemaCache lastCache;
+
+  private final boolean needUpdateCache;
+
+  private final TsBlockBuilder tsBlockBuilder;
+
+  public UpdateLastCacheOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      PartialPath fullPath,
+      TSDataType dataType,
+      DataNodeSchemaCache dataNodeSchemaCache,
+      boolean needUpdateCache) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+    this.fullPath = fullPath;
+    this.dataType = dataType.name();
+    this.lastCache = dataNodeSchemaCache;
+    this.needUpdateCache = needUpdateCache;
+    this.tsBlockBuilder =
+        new TsBlockBuilder(1, ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock res = child.next();
+    if (res == null) {
+      return null;
+    }
+    if (res.isEmpty()) {
+      return LAST_QUERY_EMPTY_TSBLOCK;
+    }
+
+    checkArgument(res.getPositionCount() == 1, "last query result should only have one record");
+
+    long lastTime = res.getColumn(0).getLong(0);
+    TsPrimitiveType lastValue = res.getColumn(1).getTsPrimitiveType(0);
+
+    if (needUpdateCache) {
+      TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
+      lastCache.updateLastCache(fullPath, timeValuePair, false, Long.MIN_VALUE);
+    }
+
+    tsBlockBuilder.reset();
+    // Time
+    tsBlockBuilder.getTimeColumnBuilder().writeLong(lastTime);
+    // timeseries
+    tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(fullPath.getFullPath()));
+    // value
+    tsBlockBuilder.getColumnBuilder(1).writeBinary(new Binary(lastValue.getStringValue()));
+    // dataType
+    tsBlockBuilder.getColumnBuilder(2).writeBinary(new Binary(dataType));
+
+    return tsBlockBuilder.build();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return child.hasNext();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return child.isFinished();
+  }
+
+  @Override
+  public void close() throws Exception {
+    child.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
new file mode 100644
index 0000000000..dfb6d82c5c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public class LastCacheScanOperator implements SourceOperator {
+
+  private final OperatorContext operatorContext;
+  private final PlanNodeId sourceId;
+  private TsBlock tsBlock;
+
+  public LastCacheScanOperator(
+      OperatorContext operatorContext, PlanNodeId sourceId, TsBlock tsBlock) {
+    this.operatorContext = operatorContext;
+    this.sourceId = sourceId;
+    this.tsBlock = tsBlock;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock res = tsBlock;
+    tsBlock = null;
+    return res;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return tsBlock != null && !tsBlock.isEmpty();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !hasNext();
+  }
+
+  @Override
+  public PlanNodeId getSourceId() {
+    return sourceId;
+  }
+}