You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/05/09 09:59:17 UTC
[iotdb] 01/01: add fast last query execution path
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/FastLast
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a5e7ab9162736e8aa52537f83d4b2d1666374749
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue May 9 17:58:35 2023 +0800
add fast last query execution path
---
.../db/metadata/cache/DataNodeSchemaCache.java | 2 +-
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 4 +
.../mpp/plan/execution/FastLastQueryExecution.java | 209 +++++++++++++++++++++
.../db/mpp/plan/parser/StatementGenerator.java | 3 +-
.../plan/statement/crud/LastQueryStatement.java | 22 +++
5 files changed, 238 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
index a9a981088c..62d661fd7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeSchemaCache.java
@@ -113,7 +113,7 @@ public class DataNodeSchemaCache {
public ClusterSchemaTree get(PartialPath fullPath) {
ClusterSchemaTree clusterSchemaTree = deviceUsingTemplateSchemaCache.get(fullPath);
- if (clusterSchemaTree == null) {
+ if (clusterSchemaTree == null || clusterSchemaTree.isEmpty()) {
return timeSeriesSchemaCache.get(fullPath);
} else {
return clusterSchemaTree;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 0d44473624..ba85e59ecd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -35,11 +35,13 @@ import org.apache.iotdb.db.mpp.execution.QueryIdGenerator;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.execution.FastLastQueryExecution;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.execution.QueryExecution;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigExecution;
import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.LastQueryStatement;
import org.apache.iotdb.db.utils.SetThreadName;
import org.slf4j.Logger;
@@ -110,6 +112,8 @@ public class Coordinator {
if (statement instanceof IConfigStatement) {
queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
return new ConfigExecution(queryContext, statement, executor);
+ } else if (statement instanceof LastQueryStatement) {
+ return new FastLastQueryExecution(statement, queryContext, partitionFetcher, schemaFetcher);
}
return new QueryExecution(
statement,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/FastLastQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/FastLastQueryExecution.java
new file mode 100644
index 0000000000..a42f1ee660
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/FastLastQueryExecution.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.LastQueryStatement;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+
+public class FastLastQueryExecution implements IQueryExecution {
+
+ private final LastQueryStatement rawStatement;
+ private final MPPQueryContext context;
+
+ private final IPartitionFetcher partitionFetcher;
+ private final ISchemaFetcher schemaFetcher;
+
+ private final TsBlockBuilder resultTsBlockBuilder;
+
+ private long totalExecutionTime;
+
+ private final DatasetHeader lastDatasetHeader = DatasetHeaderFactory.getLastQueryHeader();
+
+ private final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE = DataNodeSchemaCache.getInstance();
+
+ public FastLastQueryExecution(
+ Statement rawStatement,
+ MPPQueryContext context,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher) {
+ this.rawStatement = (LastQueryStatement) rawStatement;
+ this.context = context;
+ this.partitionFetcher = partitionFetcher;
+ this.schemaFetcher = schemaFetcher;
+
+ this.resultTsBlockBuilder = new TsBlockBuilder(lastDatasetHeader.getRespDataTypes());
+ }
+
+ @Override
+ public void start() {
+ List<PartialPath> pathPatterns = rawStatement.getFromComponent().getPrefixPaths();
+
+ PathPatternTree pathPatternTree = new PathPatternTree();
+ pathPatterns.forEach(pathPatternTree::appendPathPattern);
+ ISchemaTree schemaTree = schemaFetcher.fetchSchema(pathPatternTree, context);
+
+ for (PartialPath pathPattern : pathPatterns) {
+ List<MeasurementPath> measurementPaths = schemaTree.searchMeasurementPaths(pathPattern).left;
+ for (MeasurementPath measurementPath : measurementPaths) {
+ TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(measurementPath);
+ if (timeValuePair != null) {
+ appendLastValue(
+ timeValuePair.getTimestamp(),
+ measurementPath.toString(),
+ timeValuePair.getValue().getStringValue(),
+ measurementPath.getSeriesType());
+ }
+ }
+ }
+ }
+
+ private void appendLastValue(
+ long lastTime, String fullPath, String lastValue, TSDataType dataType) {
+ // Time
+ resultTsBlockBuilder.getTimeColumnBuilder().writeLong(lastTime);
+ // timeseries
+ resultTsBlockBuilder.getColumnBuilder(0).writeBinary(Binary.valueOf(fullPath));
+ // value
+ resultTsBlockBuilder.getColumnBuilder(1).writeBinary(Binary.valueOf(lastValue));
+ // dataType
+ resultTsBlockBuilder.getColumnBuilder(2).writeBinary(Binary.valueOf(dataType.toString()));
+ resultTsBlockBuilder.declarePosition();
+ }
+
+ @Override
+ public void stop(Throwable t) {
+ // do nothing
+ }
+
+ @Override
+ public void stopAndCleanup() {
+ // do nothing
+ }
+
+ @Override
+ public void stopAndCleanup(Throwable t) {
+ // do nothing
+ }
+
+ @Override
+ public void cancel() {
+ // do nothing
+ }
+
+ @Override
+ public ExecutionResult getStatus() {
+ return new ExecutionResult(context.getQueryId(), RpcUtils.SUCCESS_STATUS);
+ }
+
+ @Override
+ public Optional<TsBlock> getBatchResult() throws IoTDBException {
+ TsBlock result = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return Optional.ofNullable(result);
+ }
+
+ @Override
+ public Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException {
+ try {
+ TsBlock result = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ if (result.isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.ofNullable(new TsBlockSerde().serialize(result));
+ } catch (IOException e) {
+ throw new IoTDBException(e, TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ }
+ }
+
+ @Override
+ public boolean hasNextResult() {
+ return false;
+ }
+
+ @Override
+ public int getOutputValueColumnCount() {
+ return lastDatasetHeader.getOutputValueColumnCount();
+ }
+
+ @Override
+ public DatasetHeader getDatasetHeader() {
+ return lastDatasetHeader;
+ }
+
+ @Override
+ public boolean isQuery() {
+ return true;
+ }
+
+ @Override
+ public String getQueryId() {
+ return context.getQueryId().getId();
+ }
+
+ @Override
+ public long getStartExecutionTime() {
+ return context.getStartTime();
+ }
+
+ @Override
+ public void recordExecutionTime(long executionTime) {
+ totalExecutionTime += executionTime;
+ }
+
+ @Override
+ public long getTotalExecutionTime() {
+ return totalExecutionTime;
+ }
+
+ @Override
+ public Optional<String> getExecuteSQL() {
+ return Optional.ofNullable(context.getSql());
+ }
+
+ @Override
+ public Statement getStatement() {
+ return rawStatement;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index a450673702..1ad18be90e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.LastQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
@@ -201,7 +202,7 @@ public class StatementGenerator {
new ConstantOperand(TSDataType.INT64, Long.toString(lastDataQueryReq.getTime())));
whereCondition.setPredicate(predicate);
- QueryStatement lastQueryStatement = new QueryStatement();
+ LastQueryStatement lastQueryStatement = new LastQueryStatement();
lastQueryStatement.setSelectComponent(selectComponent);
lastQueryStatement.setFromComponent(fromComponent);
lastQueryStatement.setWhereCondition(whereCondition);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastQueryStatement.java
new file mode 100644
index 0000000000..1ae9bd7fbe
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LastQueryStatement.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.crud;
+
+public class LastQueryStatement extends QueryStatement {}