You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/05/09 09:59:17 UTC

[iotdb] 01/01: add fast last query execution path

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FastLast
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a5e7ab9162736e8aa52537f83d4b2d1666374749
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue May 9 17:58:35 2023 +0800

    add fast last query execution path
---
 .../db/metadata/cache/DataNodeSchemaCache.java     |   2 +-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |   4 +
 .../mpp/plan/execution/FastLastQueryExecution.java | 209 +++++++++++++++++++++
 .../db/mpp/plan/parser/StatementGenerator.java     |   3 +-
 .../plan/statement/crud/LastQueryStatement.java    |  22 +++
 5 files changed, 238 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index a9a981088c..62d661fd7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -113,7 +113,7 @@ public class DataNodeSchemaCache {
 
   public ClusterSchemaTree get(PartialPath fullPath) {
     ClusterSchemaTree clusterSchemaTree = deviceUsingTemplateSchemaCache.get(fullPath);
-    if (clusterSchemaTree == null) {
+    if (clusterSchemaTree == null || clusterSchemaTree.isEmpty()) {
       return timeSeriesSchemaCache.get(fullPath);
     } else {
       return clusterSchemaTree;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 0d44473624..ba85e59ecd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -35,11 +35,13 @@ import org.apache.iotdb.db.mpp.execution.QueryIdGenerator;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.execution.FastLastQueryExecution;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.execution.QueryExecution;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigExecution;
 import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.LastQueryStatement;
 import org.apache.iotdb.db.utils.SetThreadName;
 
 import org.slf4j.Logger;
@@ -110,6 +112,8 @@ public class Coordinator {
     if (statement instanceof IConfigStatement) {
       queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
       return new ConfigExecution(queryContext, statement, executor);
+    } else if (statement instanceof LastQueryStatement) {
+      return new FastLastQueryExecution(statement, queryContext, partitionFetcher, schemaFetcher);
     }
     return new QueryExecution(
         statement,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/FastLastQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/FastLastQueryExecution.java
new file mode 100644
index 0000000000..a42f1ee660
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/FastLastQueryExecution.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.LastQueryStatement;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+
+public class FastLastQueryExecution implements IQueryExecution {
+
+  private final LastQueryStatement rawStatement;
+  private final MPPQueryContext context;
+
+  private final IPartitionFetcher partitionFetcher;
+  private final ISchemaFetcher schemaFetcher;
+
+  private final TsBlockBuilder resultTsBlockBuilder;
+
+  private long totalExecutionTime;
+
+  private final DatasetHeader lastDatasetHeader = DatasetHeaderFactory.getLastQueryHeader();
+
+  private final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE = DataNodeSchemaCache.getInstance();
+
+  public FastLastQueryExecution(
+      Statement rawStatement,
+      MPPQueryContext context,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
+    this.rawStatement = (LastQueryStatement) rawStatement;
+    this.context = context;
+    this.partitionFetcher = partitionFetcher;
+    this.schemaFetcher = schemaFetcher;
+
+    this.resultTsBlockBuilder = new TsBlockBuilder(lastDatasetHeader.getRespDataTypes());
+  }
+
+  @Override
+  public void start() {
+    List<PartialPath> pathPatterns = rawStatement.getFromComponent().getPrefixPaths();
+
+    PathPatternTree pathPatternTree = new PathPatternTree();
+    pathPatterns.forEach(pathPatternTree::appendPathPattern);
+    ISchemaTree schemaTree = schemaFetcher.fetchSchema(pathPatternTree, context);
+
+    for (PartialPath pathPattern : pathPatterns) {
+      List<MeasurementPath> measurementPaths = schemaTree.searchMeasurementPaths(pathPattern).left;
+      for (MeasurementPath measurementPath : measurementPaths) {
+        TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(measurementPath);
+        if (timeValuePair != null) {
+          appendLastValue(
+              timeValuePair.getTimestamp(),
+              measurementPath.toString(),
+              timeValuePair.getValue().getStringValue(),
+              measurementPath.getSeriesType());
+        }
+      }
+    }
+  }
+
+  private void appendLastValue(
+      long lastTime, String fullPath, String lastValue, TSDataType dataType) {
+    // Time
+    resultTsBlockBuilder.getTimeColumnBuilder().writeLong(lastTime);
+    // timeseries
+    resultTsBlockBuilder.getColumnBuilder(0).writeBinary(Binary.valueOf(fullPath));
+    // value
+    resultTsBlockBuilder.getColumnBuilder(1).writeBinary(Binary.valueOf(lastValue));
+    // dataType
+    resultTsBlockBuilder.getColumnBuilder(2).writeBinary(Binary.valueOf(dataType.toString()));
+    resultTsBlockBuilder.declarePosition();
+  }
+
+  @Override
+  public void stop(Throwable t) {
+    // do nothing
+  }
+
+  @Override
+  public void stopAndCleanup() {
+    // do nothing
+  }
+
+  @Override
+  public void stopAndCleanup(Throwable t) {
+    // do nothing
+  }
+
+  @Override
+  public void cancel() {
+    // do nothing
+  }
+
+  @Override
+  public ExecutionResult getStatus() {
+    return new ExecutionResult(context.getQueryId(), RpcUtils.SUCCESS_STATUS);
+  }
+
+  @Override
+  public Optional<TsBlock> getBatchResult() throws IoTDBException {
+    TsBlock result = resultTsBlockBuilder.build();
+    resultTsBlockBuilder.reset();
+    return Optional.ofNullable(result);
+  }
+
+  @Override
+  public Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException {
+    try {
+      TsBlock result = resultTsBlockBuilder.build();
+      resultTsBlockBuilder.reset();
+      if (result.isEmpty()) {
+        return Optional.empty();
+      }
+      return Optional.ofNullable(new TsBlockSerde().serialize(result));
+    } catch (IOException e) {
+      throw new IoTDBException(e, TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+    }
+  }
+
+  @Override
+  public boolean hasNextResult() {
+    return false;
+  }
+
+  @Override
+  public int getOutputValueColumnCount() {
+    return lastDatasetHeader.getOutputValueColumnCount();
+  }
+
+  @Override
+  public DatasetHeader getDatasetHeader() {
+    return lastDatasetHeader;
+  }
+
+  @Override
+  public boolean isQuery() {
+    return true;
+  }
+
+  @Override
+  public String getQueryId() {
+    return context.getQueryId().getId();
+  }
+
+  @Override
+  public long getStartExecutionTime() {
+    return context.getStartTime();
+  }
+
+  @Override
+  public void recordExecutionTime(long executionTime) {
+    totalExecutionTime += executionTime;
+  }
+
+  @Override
+  public long getTotalExecutionTime() {
+    return totalExecutionTime;
+  }
+
+  @Override
+  public Optional<String> getExecuteSQL() {
+    return Optional.ofNullable(context.getSql());
+  }
+
+  @Override
+  public Statement getStatement() {
+    return rawStatement;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index a450673702..1ad18be90e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.LastQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
@@ -201,7 +202,7 @@ public class StatementGenerator {
             new ConstantOperand(TSDataType.INT64, Long.toString(lastDataQueryReq.getTime())));
     whereCondition.setPredicate(predicate);
 
-    QueryStatement lastQueryStatement = new QueryStatement();
+    LastQueryStatement lastQueryStatement = new LastQueryStatement();
     lastQueryStatement.setSelectComponent(selectComponent);
     lastQueryStatement.setFromComponent(fromComponent);
     lastQueryStatement.setWhereCondition(whereCondition);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastQueryStatement.java
new file mode 100644
index 0000000000..1ae9bd7fbe
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastQueryStatement.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.crud;
+
+public class LastQueryStatement extends QueryStatement {}