You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/08 05:51:28 UTC

[iotdb] branch ty-mpp created (now 135322cd49)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 135322cd49 Construct SinkHandle in LocalExecutionPlanner and remove the IOExecption in next and hasNext method of Operator

This branch includes the following new commits:

     new 135322cd49 Construct SinkHandle in LocalExecutionPlanner and remove the IOExecption in next and hasNext method of Operator

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Construct SinkHandle in LocalExecutionPlanner and remove the IOExecption in next and hasNext method of Operator

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 135322cd49156261205c453928518cc51b0173c0
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Apr 8 13:51:07 2022 +0800

    Construct SinkHandle in LocalExecutionPlanner and remove the IOExecption in next and hasNext method of Operator
---
 .../db/mpp/execution/FragmentInstanceManager.java  | 47 ++++++++++++------
 .../org/apache/iotdb/db/mpp/operator/Operator.java |  4 +-
 .../db/mpp/operator/process/LimitOperator.java     |  4 +-
 .../db/mpp/operator/process/TimeJoinOperator.java  |  5 +-
 .../db/mpp/operator/source/SeriesScanOperator.java | 56 ++++++++++++----------
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  | 35 ++++++++++++--
 .../iotdb/db/mpp/operator/LimitOperatorTest.java   |  2 +-
 .../db/mpp/operator/SeriesScanOperatorTest.java    |  2 +-
 .../db/mpp/operator/TimeJoinOperatorTest.java      |  2 +-
 9 files changed, 102 insertions(+), 55 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 5c68f772e2..6ba68e55a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -93,16 +93,21 @@ public class FragmentInstanceManager {
                       instanceId,
                       fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
 
-              DataDriver driver =
-                  planner.plan(
-                      instance.getFragment().getRoot(),
-                      context,
-                      instance.getTimeFilter(),
-                      dataRegion);
-              return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+              try {
+                DataDriver driver =
+                    planner.plan(
+                        instance.getFragment().getRoot(),
+                        context,
+                        instance.getTimeFilter(),
+                        dataRegion);
+                return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+              } catch (Throwable t) {
+                context.failed(t);
+                return null;
+              }
             });
 
-    return execution.getInstanceInfo();
+    return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
   }
 
   public FragmentInstanceInfo execSchemaQueryFragmentInstance(
@@ -121,12 +126,16 @@ public class FragmentInstanceManager {
                       instanceId,
                       fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
 
-              SchemaDriver driver =
-                  planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
-              return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+              try {
+                SchemaDriver driver =
+                    planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
+                return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+              } catch (Throwable t) {
+                context.failed(t);
+                return null;
+              }
             });
-
-    return execution.getInstanceInfo();
+    return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
   }
 
   public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
@@ -154,14 +163,22 @@ public class FragmentInstanceManager {
     return execution.getInstanceInfo();
   }
 
+  private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) {
+    return new FragmentInstanceInfo(
+        FragmentInstanceState.FAILED, instanceContext.get(instanceId).getEndTime());
+  }
+
   private void removeOldTasks() {
     long oldestAllowedInstance = System.currentTimeMillis() - infoCacheTime.toMillis();
-    instanceExecution
+    instanceContext
         .entrySet()
         .removeIf(
             entry -> {
               FragmentInstanceId instanceId = entry.getKey();
-              FragmentInstanceExecution execution = entry.getValue();
+              FragmentInstanceExecution execution = instanceExecution.get(instanceId);
+              if (execution == null) {
+                return true;
+              }
               long endTime = execution.getInstanceInfo().getEndTime();
               if (endTime != -1 && endTime <= oldestAllowedInstance) {
                 instanceContext.remove(instanceId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index af18fb2ba0..c8cccc0520 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -41,10 +41,10 @@ public interface Operator extends AutoCloseable {
   }
 
   /** Gets next tsBlock from this operator. If no data is currently available, return null. */
-  TsBlock next() throws IOException;
+  TsBlock next();
 
   /** @return true if the operator has more data, otherwise false */
-  boolean hasNext() throws IOException;
+  boolean hasNext();
 
   /** This method will always be called before releasing the Operator reference. */
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index ef5466854f..a226637146 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -53,7 +53,7 @@ public class LimitOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() throws IOException {
+  public TsBlock next() {
     TsBlock block = child.next();
     TsBlock res = block;
     if (block.getPositionCount() <= remainingLimit) {
@@ -66,7 +66,7 @@ public class LimitOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() throws IOException {
+  public boolean hasNext() {
     return remainingLimit > 0 && child.hasNext();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index e38a150dff..f58af90e77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.io.IOException;
 import java.util.List;
 
 public class TimeJoinOperator implements ProcessOperator {
@@ -96,7 +95,7 @@ public class TimeJoinOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() throws IOException {
+  public TsBlock next() {
     // end time for returned TsBlock this time, it's the min end time among all the children
     // TsBlocks
     long currentEndTime = 0;
@@ -154,7 +153,7 @@ public class TimeJoinOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() throws IOException {
+  public boolean hasNext() {
     if (finished) {
       return false;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index acb40baeaa..e979710c3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -63,47 +63,51 @@ public class SeriesScanOperator implements SourceOperator {
   }
 
   @Override
-  public TsBlock next() throws IOException {
+  public TsBlock next() {
     if (hasCachedTsBlock || hasNext()) {
       hasCachedTsBlock = false;
       return tsBlock;
     }
-    throw new IOException("no next batch");
+    throw new IllegalStateException("no next batch");
   }
 
   @Override
-  public boolean hasNext() throws IOException {
+  public boolean hasNext() {
 
-    if (hasCachedTsBlock) {
-      return true;
-    }
-
-    /*
-     * consume page data firstly
-     */
-    if (readPageData()) {
-      hasCachedTsBlock = true;
-      return true;
-    }
+    try {
+      if (hasCachedTsBlock) {
+        return true;
+      }
 
-    /*
-     * consume chunk data secondly
-     */
-    if (readChunkData()) {
-      hasCachedTsBlock = true;
-      return true;
-    }
+      /*
+       * consume page data firstly
+       */
+      if (readPageData()) {
+        hasCachedTsBlock = true;
+        return true;
+      }
 
-    /*
-     * consume next file finally
-     */
-    while (seriesScanUtil.hasNextFile()) {
+      /*
+       * consume chunk data secondly
+       */
       if (readChunkData()) {
         hasCachedTsBlock = true;
         return true;
       }
+
+      /*
+       * consume next file finally
+       */
+      while (seriesScanUtil.hasNextFile()) {
+        if (readChunkData()) {
+          hasCachedTsBlock = true;
+          return true;
+        }
+      }
+      return hasCachedTsBlock;
+    } catch (IOException e) {
+      throw new RuntimeException("Error happened while scanning the file", e);
     }
-    return hasCachedTsBlock;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index ece32553b0..1af476b7a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -18,10 +18,14 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager;
+import org.apache.iotdb.db.mpp.buffer.DataBlockService;
 import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
 import org.apache.iotdb.db.mpp.execution.DataDriver;
 import org.apache.iotdb.db.mpp.execution.DataDriverContext;
@@ -51,8 +55,10 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -67,6 +73,9 @@ import static java.util.Objects.requireNonNull;
  */
 public class LocalExecutionPlanner {
 
+  private static final DataBlockManager DATA_BLOCK_MANAGER =
+      DataBlockService.getInstance().getDataBlockManager();
+
   public static LocalExecutionPlanner getInstance() {
     return InstanceHolder.INSTANCE;
   }
@@ -212,6 +221,7 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitExchange(ExchangeNode node, LocalExecutionPlanContext context) {
+
       // TODO(jackie tien) create SourceHandle here
       return super.visitExchange(node, context);
     }
@@ -219,10 +229,27 @@ public class LocalExecutionPlanner {
     @Override
     public Operator visitFragmentSink(FragmentSinkNode node, LocalExecutionPlanContext context) {
       Operator child = node.getChild().accept(this, context);
-      // TODO(jackie tien) create SinkHandle here
-      ISinkHandle sinkHandle = null;
-      context.setSinkHandle(sinkHandle);
-      return child;
+      Endpoint target = node.getDownStreamEndpoint();
+      FragmentInstanceId localInstanceId = context.instanceContext.getId();
+      FragmentInstanceId targetInstanceId = node.getDownStreamInstanceId();
+      try {
+        ISinkHandle sinkHandle =
+            DATA_BLOCK_MANAGER.createSinkHandle(
+                new TFragmentInstanceId(
+                    localInstanceId.getQueryId().getId(),
+                    String.valueOf(localInstanceId.getFragmentId().getId()),
+                    localInstanceId.getInstanceId()),
+                target.getIp(),
+                new TFragmentInstanceId(
+                    targetInstanceId.getQueryId().getId(),
+                    String.valueOf(targetInstanceId.getFragmentId().getId()),
+                    targetInstanceId.getInstanceId()),
+                node.getDownStreamPlanNodeId().getId());
+        context.setSinkHandle(sinkHandle);
+        return child;
+      } catch (IOException e) {
+        throw new RuntimeException("Error happened while creating sink handle", e);
+      }
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index f1e71d4086..f0c8e7e5c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -162,7 +162,7 @@ public class LimitOperatorTest {
         count++;
       }
       assertEquals(13, count);
-    } catch (IOException | IllegalPathException e) {
+    } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index db5b568c3e..669e374bbd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -118,7 +118,7 @@ public class SeriesScanOperatorTest {
         count++;
       }
       assertEquals(25, count);
-    } catch (IOException | IllegalPathException e) {
+    } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 2e49975d50..2289ae7f4e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -148,7 +148,7 @@ public class TimeJoinOperatorTest {
         count++;
       }
       assertEquals(25, count);
-    } catch (IOException | IllegalPathException e) {
+    } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
     }