You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/23 04:35:40 UTC

[iotdb] branch LastOperator created (now 16e1902238)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 16e1902238 Operators related to last query

This branch includes the following new commits:

     new 16e1902238 Operators related to last query

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Operators related to last query

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 16e19022380361bf456be11f4e5d0e4643497e92
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 12:35:22 2022 +0800

    Operators related to last query
---
 .../operator/process/LastQueryMergeOperator.java   |  77 ++++++++++++
 .../operator/process/UpdateLastCacheOperator.java  | 133 +++++++++++++++++++++
 .../operator/source/LastCacheScanOperator.java     |  64 ++++++++++
 3 files changed, 274 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
new file mode 100644
index 0000000000..c7e678c884
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+public class LastQueryMergeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+
+  private final List<Operator> children;
+
+  private final int inputOperatorsCount;
+
+  private int currentIndex;
+
+  public LastQueryMergeOperator(OperatorContext operatorContext, List<Operator> children) {
+    this.operatorContext = operatorContext;
+    this.children = children;
+    this.inputOperatorsCount = children.size();
+    this.currentIndex = 0;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return children.get(currentIndex).isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    return children.get(currentIndex++).next();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return currentIndex < inputOperatorsCount && children.get(currentIndex).hasNext();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return currentIndex >= inputOperatorsCount;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator child : children) {
+      child.close();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
new file mode 100644
index 0000000000..1daab0f0f9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class UpdateLastCacheOperator implements ProcessOperator {
+
+  private static final TsBlock LAST_QUERY_EMPTY_TSBLOCK =
+      new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT))
+          .build();
+
+  private final OperatorContext operatorContext;
+
+  private final Operator child;
+
+  // fullPath for queried time series
+  private final PartialPath fullPath;
+
+  // dataType for queried time series;
+  private final String dataType;
+
+  private final DataNodeSchemaCache lastCache;
+
+  private final boolean needUpdateCache;
+
+  private final TsBlockBuilder tsBlockBuilder;
+
+  public UpdateLastCacheOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      PartialPath fullPath,
+      TSDataType dataType,
+      DataNodeSchemaCache dataNodeSchemaCache,
+      boolean needUpdateCache) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+    this.fullPath = fullPath;
+    this.dataType = dataType.name();
+    this.lastCache = dataNodeSchemaCache;
+    this.needUpdateCache = needUpdateCache;
+    this.tsBlockBuilder =
+        new TsBlockBuilder(1, ImmutableList.of(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock res = child.next();
+    if (res == null) {
+      return null;
+    }
+    if (res.isEmpty()) {
+      return LAST_QUERY_EMPTY_TSBLOCK;
+    }
+
+    checkArgument(res.getPositionCount() == 1, "last query result should only have one record");
+
+    long lastTime = res.getColumn(0).getLong(0);
+    TsPrimitiveType lastValue = res.getColumn(1).getTsPrimitiveType(0);
+
+    if (needUpdateCache) {
+      TimeValuePair timeValuePair = new TimeValuePair(lastTime, lastValue);
+      lastCache.updateLastCache(fullPath, timeValuePair, false, Long.MIN_VALUE);
+    }
+
+    tsBlockBuilder.reset();
+    // Time
+    tsBlockBuilder.getTimeColumnBuilder().writeLong(lastTime);
+    // timeseries
+    tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(fullPath.getFullPath()));
+    // value
+    tsBlockBuilder.getColumnBuilder(1).writeBinary(new Binary(lastValue.getStringValue()));
+    // dataType
+    tsBlockBuilder.getColumnBuilder(2).writeBinary(new Binary(dataType));
+
+    return tsBlockBuilder.build();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return child.hasNext();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return child.isFinished();
+  }
+
+  @Override
+  public void close() throws Exception {
+    child.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
new file mode 100644
index 0000000000..dfb6d82c5c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/LastCacheScanOperator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public class LastCacheScanOperator implements SourceOperator {
+
+  private final OperatorContext operatorContext;
+  private final PlanNodeId sourceId;
+  private TsBlock tsBlock;
+
+  public LastCacheScanOperator(
+      OperatorContext operatorContext, PlanNodeId sourceId, TsBlock tsBlock) {
+    this.operatorContext = operatorContext;
+    this.sourceId = sourceId;
+    this.tsBlock = tsBlock;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock res = tsBlock;
+    tsBlock = null;
+    return res;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return tsBlock != null && !tsBlock.isEmpty();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !hasNext();
+  }
+
+  @Override
+  public PlanNodeId getSourceId() {
+    return sourceId;
+  }
+}