You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/12 15:40:53 UTC

[iotdb] branch xingtanzjr/mpp_memory_table created (now 73deb5c1eb)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/mpp_memory_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 73deb5c1eb complete the framework

This branch includes the following new commits:

     new 73deb5c1eb complete the framework

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: complete the framework

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp_memory_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 73deb5c1ebf30dd2e276a6320182f03f428ce53b
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 12 23:40:39 2022 +0800

    complete the framework
---
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 14 +++-
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |  1 +
 .../db/mpp/plan/execution/QueryExecution.java      | 18 ++++-
 .../plan/execution/memory/MemorySourceHandle.java  | 80 ++++++++++++++++++++++
 .../execution/memory/StatementMemoryTable.java     | 41 +++++++++++
 .../memory/StatementMemoryTableContext.java        | 41 +++++++++++
 .../memory/StatementMemoryTableVisitor.java        | 36 ++++++++++
 .../execution/memory/MemorySourceHandleTest.java   | 42 ++++++++++++
 8 files changed, 270 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 3c1f8b3b64..21431a4af8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -88,7 +88,11 @@ public class Analysis {
   // header of result dataset
   private DatasetHeader respDatasetHeader;
 
-  public Analysis() {}
+  private boolean finishQueryAfterAnalyze;
+
+  public Analysis() {
+    this.finishQueryAfterAnalyze = false;
+  }
 
   public List<TRegionReplicaSet> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
     // TODO: (xingtanzjr) implement the calculation of timePartitionIdList
@@ -235,4 +239,12 @@ public class Analysis {
   public void setGroupByTimeParameter(GroupByTimeParameter groupByTimeParameter) {
     this.groupByTimeParameter = groupByTimeParameter;
   }
+
+  public boolean isFinishQueryAfterAnalyze() {
+    return finishQueryAfterAnalyze;
+  }
+
+  public void setFinishQueryAfterAnalyze(boolean finishQueryAfterAnalyze) {
+    this.finishQueryAfterAnalyze = finishQueryAfterAnalyze;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 6217f23185..66ad541db6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -140,6 +140,7 @@ public class Analyzer {
         // If there is no leaf node in the schema tree, the query should be completed immediately
         if (schemaTree.isEmpty()) {
           analysis.setRespDatasetHeader(new DatasetHeader(new ArrayList<>(), false));
+          analysis.setFinishQueryAfterAnalyze(true);
           return analysis;
         }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 257559527c..253ccf9b79 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -34,6 +34,10 @@ import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.execution.memory.MemorySourceHandle;
+import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemoryTable;
+import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemoryTableContext;
+import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemoryTableVisitor;
 import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.plan.planner.DistributionPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
@@ -143,7 +147,8 @@ public class QueryExecution implements IQueryExecution {
       logger.info(
           "{} execution of query will be skipped. Transit to FINISHED immediately.",
           getLogHeader());
-      stateMachine.transitionToFinished();
+      constructResultForMemoryTable();
+      stateMachine.transitionToRunning();
       return;
     }
     doLogicalPlan();
@@ -155,7 +160,16 @@ public class QueryExecution implements IQueryExecution {
   }
 
   private boolean skipExecute() {
-    return context.getQueryType() == QueryType.READ && !analysis.hasDataSource();
+    return analysis.isFinishQueryAfterAnalyze()
+        || (context.getQueryType() == QueryType.READ && !analysis.hasDataSource());
+  }
+
+  private void constructResultForMemoryTable() {
+    StatementMemoryTable memoryTable =
+        new StatementMemoryTableVisitor()
+            .process(analysis.getStatement(), new StatementMemoryTableContext(context, analysis));
+    this.resultHandle = new MemorySourceHandle(memoryTable.getTsBlock());
+    this.analysis.setRespDatasetHeader(memoryTable.getDatasetHeader());
   }
 
   // Analyze the statement in QueryContext. Generate the analysis this query need
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
new file mode 100644
index 0000000000..fcd57059c5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.memory;
+
+import org.apache.iotdb.db.mpp.execution.datatransfer.ISourceHandle;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+
+public class MemorySourceHandle implements ISourceHandle {
+
+  private final TsBlock result;
+  private boolean hasNext;
+
+  public MemorySourceHandle(TsBlock result) {
+    Validate.notNull(result, "the TsBlock should not be null when constructing MemorySourceHandle");
+    this.result = result;
+    this.hasNext = true;
+  }
+
+  @Override
+  public TFragmentInstanceId getLocalFragmentInstanceId() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getLocalPlanNodeId() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getBufferRetainedSizeInBytes() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TsBlock receive() {
+    hasNext = false;
+    return result;
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !hasNext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return immediateFuture(null);
+  }
+
+  @Override
+  public boolean isAborted() {
+    return false;
+  }
+
+  @Override
+  public void abort() {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemoryTable.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemoryTable.java
new file mode 100644
index 0000000000..7fc9a0b7c7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemoryTable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.memory;
+
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public class StatementMemoryTable {
+  private final TsBlock tsBlock;
+  private final DatasetHeader datasetHeader;
+
+  public StatementMemoryTable(TsBlock tsBlock, DatasetHeader datasetHeader) {
+    this.tsBlock = tsBlock;
+    this.datasetHeader = datasetHeader;
+  }
+
+  public TsBlock getTsBlock() {
+    return tsBlock;
+  }
+
+  public DatasetHeader getDatasetHeader() {
+    return datasetHeader;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemoryTableContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemoryTableContext.java
new file mode 100644
index 0000000000..9c5a521cf7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemoryTableContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.memory;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+
+public class StatementMemoryTableContext {
+  private final MPPQueryContext queryContext;
+  private final Analysis analysis;
+
+  public StatementMemoryTableContext(MPPQueryContext queryContext, Analysis analysis) {
+    this.queryContext = queryContext;
+    this.analysis = analysis;
+  }
+
+  public MPPQueryContext getQueryContext() {
+    return queryContext;
+  }
+
+  public Analysis getAnalysis() {
+    return analysis;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemoryTableVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemoryTableVisitor.java
new file mode 100644
index 0000000000..990a765551
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemoryTableVisitor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.memory;
+
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import java.util.ArrayList;
+
+public class StatementMemoryTableVisitor
+    extends StatementVisitor<StatementMemoryTable, StatementMemoryTableContext> {
+
+  @Override
+  public StatementMemoryTable visitNode(StatementNode node, StatementMemoryTableContext context) {
+    return new StatementMemoryTable(new TsBlock(0), new DatasetHeader(new ArrayList<>(), false));
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
new file mode 100644
index 0000000000..fd91eb8e6b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/memory/MemorySourceHandleTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.memory;
+
+import org.apache.iotdb.db.mpp.plan.execution.memory.MemorySourceHandle;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MemorySourceHandleTest {
+
+  @Test
+  public void testNormalFinished() {
+    TsBlock rawResult = new TsBlock(0);
+    MemorySourceHandle sourceHandle = new MemorySourceHandle(rawResult);
+    Assert.assertFalse(sourceHandle.isFinished());
+    ListenableFuture<Void> blocked = sourceHandle.isBlocked();
+    Assert.assertTrue(blocked.isDone());
+    TsBlock result = sourceHandle.receive();
+    Assert.assertEquals(rawResult, result);
+    Assert.assertTrue(sourceHandle.isFinished());
+  }
+}