You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/08 05:51:29 UTC

[iotdb] 01/01: Construct SinkHandle in LocalExecutionPlanner and remove the IOExecption in next and hasNext method of Operator

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 135322cd49156261205c453928518cc51b0173c0
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Apr 8 13:51:07 2022 +0800

    Construct SinkHandle in LocalExecutionPlanner and remove the IOExecption in next and hasNext method of Operator
---
 .../db/mpp/execution/FragmentInstanceManager.java  | 47 ++++++++++++------
 .../org/apache/iotdb/db/mpp/operator/Operator.java |  4 +-
 .../db/mpp/operator/process/LimitOperator.java     |  4 +-
 .../db/mpp/operator/process/TimeJoinOperator.java  |  5 +-
 .../db/mpp/operator/source/SeriesScanOperator.java | 56 ++++++++++++----------
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  | 35 ++++++++++++--
 .../iotdb/db/mpp/operator/LimitOperatorTest.java   |  2 +-
 .../db/mpp/operator/SeriesScanOperatorTest.java    |  2 +-
 .../db/mpp/operator/TimeJoinOperatorTest.java      |  2 +-
 9 files changed, 102 insertions(+), 55 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 5c68f772e2..6ba68e55a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -93,16 +93,21 @@ public class FragmentInstanceManager {
                       instanceId,
                       fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
 
-              DataDriver driver =
-                  planner.plan(
-                      instance.getFragment().getRoot(),
-                      context,
-                      instance.getTimeFilter(),
-                      dataRegion);
-              return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+              try {
+                DataDriver driver =
+                    planner.plan(
+                        instance.getFragment().getRoot(),
+                        context,
+                        instance.getTimeFilter(),
+                        dataRegion);
+                return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+              } catch (Throwable t) {
+                context.failed(t);
+                return null;
+              }
             });
 
-    return execution.getInstanceInfo();
+    return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
   }
 
   public FragmentInstanceInfo execSchemaQueryFragmentInstance(
@@ -121,12 +126,16 @@ public class FragmentInstanceManager {
                       instanceId,
                       fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
 
-              SchemaDriver driver =
-                  planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
-              return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+              try {
+                SchemaDriver driver =
+                    planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
+                return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+              } catch (Throwable t) {
+                context.failed(t);
+                return null;
+              }
             });
-
-    return execution.getInstanceInfo();
+    return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
   }
 
   public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
@@ -154,14 +163,22 @@ public class FragmentInstanceManager {
     return execution.getInstanceInfo();
   }
 
+  private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) {
+    return new FragmentInstanceInfo(
+        FragmentInstanceState.FAILED, instanceContext.get(instanceId).getEndTime());
+  }
+
   private void removeOldTasks() {
     long oldestAllowedInstance = System.currentTimeMillis() - infoCacheTime.toMillis();
-    instanceExecution
+    instanceContext
         .entrySet()
         .removeIf(
             entry -> {
               FragmentInstanceId instanceId = entry.getKey();
-              FragmentInstanceExecution execution = entry.getValue();
+              FragmentInstanceExecution execution = instanceExecution.get(instanceId);
+              if (execution == null) {
+                return true;
+              }
               long endTime = execution.getInstanceInfo().getEndTime();
               if (endTime != -1 && endTime <= oldestAllowedInstance) {
                 instanceContext.remove(instanceId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index af18fb2ba0..c8cccc0520 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -41,10 +41,10 @@ public interface Operator extends AutoCloseable {
   }
 
   /** Gets next tsBlock from this operator. If no data is currently available, return null. */
-  TsBlock next() throws IOException;
+  TsBlock next();
 
   /** @return true if the operator has more data, otherwise false */
-  boolean hasNext() throws IOException;
+  boolean hasNext();
 
   /** This method will always be called before releasing the Operator reference. */
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index ef5466854f..a226637146 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -53,7 +53,7 @@ public class LimitOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() throws IOException {
+  public TsBlock next() {
     TsBlock block = child.next();
     TsBlock res = block;
     if (block.getPositionCount() <= remainingLimit) {
@@ -66,7 +66,7 @@ public class LimitOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() throws IOException {
+  public boolean hasNext() {
     return remainingLimit > 0 && child.hasNext();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index e38a150dff..f58af90e77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.io.IOException;
 import java.util.List;
 
 public class TimeJoinOperator implements ProcessOperator {
@@ -96,7 +95,7 @@ public class TimeJoinOperator implements ProcessOperator {
   }
 
   @Override
-  public TsBlock next() throws IOException {
+  public TsBlock next() {
     // end time for returned TsBlock this time, it's the min end time among all the children
     // TsBlocks
     long currentEndTime = 0;
@@ -154,7 +153,7 @@ public class TimeJoinOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() throws IOException {
+  public boolean hasNext() {
     if (finished) {
       return false;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index acb40baeaa..e979710c3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -63,47 +63,51 @@ public class SeriesScanOperator implements SourceOperator {
   }
 
   @Override
-  public TsBlock next() throws IOException {
+  public TsBlock next() {
     if (hasCachedTsBlock || hasNext()) {
       hasCachedTsBlock = false;
       return tsBlock;
     }
-    throw new IOException("no next batch");
+    throw new IllegalStateException("no next batch");
   }
 
   @Override
-  public boolean hasNext() throws IOException {
+  public boolean hasNext() {
 
-    if (hasCachedTsBlock) {
-      return true;
-    }
-
-    /*
-     * consume page data firstly
-     */
-    if (readPageData()) {
-      hasCachedTsBlock = true;
-      return true;
-    }
+    try {
+      if (hasCachedTsBlock) {
+        return true;
+      }
 
-    /*
-     * consume chunk data secondly
-     */
-    if (readChunkData()) {
-      hasCachedTsBlock = true;
-      return true;
-    }
+      /*
+       * consume page data firstly
+       */
+      if (readPageData()) {
+        hasCachedTsBlock = true;
+        return true;
+      }
 
-    /*
-     * consume next file finally
-     */
-    while (seriesScanUtil.hasNextFile()) {
+      /*
+       * consume chunk data secondly
+       */
       if (readChunkData()) {
         hasCachedTsBlock = true;
         return true;
       }
+
+      /*
+       * consume next file finally
+       */
+      while (seriesScanUtil.hasNextFile()) {
+        if (readChunkData()) {
+          hasCachedTsBlock = true;
+          return true;
+        }
+      }
+      return hasCachedTsBlock;
+    } catch (IOException e) {
+      throw new RuntimeException("Error happened while scanning the file", e);
     }
-    return hasCachedTsBlock;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index ece32553b0..1af476b7a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -18,10 +18,14 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager;
+import org.apache.iotdb.db.mpp.buffer.DataBlockService;
 import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
 import org.apache.iotdb.db.mpp.execution.DataDriver;
 import org.apache.iotdb.db.mpp.execution.DataDriverContext;
@@ -51,8 +55,10 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -67,6 +73,9 @@ import static java.util.Objects.requireNonNull;
  */
 public class LocalExecutionPlanner {
 
+  private static final DataBlockManager DATA_BLOCK_MANAGER =
+      DataBlockService.getInstance().getDataBlockManager();
+
   public static LocalExecutionPlanner getInstance() {
     return InstanceHolder.INSTANCE;
   }
@@ -212,6 +221,7 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitExchange(ExchangeNode node, LocalExecutionPlanContext context) {
+
       // TODO(jackie tien) create SourceHandle here
       return super.visitExchange(node, context);
     }
@@ -219,10 +229,27 @@ public class LocalExecutionPlanner {
     @Override
     public Operator visitFragmentSink(FragmentSinkNode node, LocalExecutionPlanContext context) {
       Operator child = node.getChild().accept(this, context);
-      // TODO(jackie tien) create SinkHandle here
-      ISinkHandle sinkHandle = null;
-      context.setSinkHandle(sinkHandle);
-      return child;
+      Endpoint target = node.getDownStreamEndpoint();
+      FragmentInstanceId localInstanceId = context.instanceContext.getId();
+      FragmentInstanceId targetInstanceId = node.getDownStreamInstanceId();
+      try {
+        ISinkHandle sinkHandle =
+            DATA_BLOCK_MANAGER.createSinkHandle(
+                new TFragmentInstanceId(
+                    localInstanceId.getQueryId().getId(),
+                    String.valueOf(localInstanceId.getFragmentId().getId()),
+                    localInstanceId.getInstanceId()),
+                target.getIp(),
+                new TFragmentInstanceId(
+                    targetInstanceId.getQueryId().getId(),
+                    String.valueOf(targetInstanceId.getFragmentId().getId()),
+                    targetInstanceId.getInstanceId()),
+                node.getDownStreamPlanNodeId().getId());
+        context.setSinkHandle(sinkHandle);
+        return child;
+      } catch (IOException e) {
+        throw new RuntimeException("Error happened while creating sink handle", e);
+      }
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index f1e71d4086..f0c8e7e5c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -162,7 +162,7 @@ public class LimitOperatorTest {
         count++;
       }
       assertEquals(13, count);
-    } catch (IOException | IllegalPathException e) {
+    } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index db5b568c3e..669e374bbd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -118,7 +118,7 @@ public class SeriesScanOperatorTest {
         count++;
       }
       assertEquals(25, count);
-    } catch (IOException | IllegalPathException e) {
+    } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 2e49975d50..2289ae7f4e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -148,7 +148,7 @@ public class TimeJoinOperatorTest {
         count++;
       }
       assertEquals(25, count);
-    } catch (IOException | IllegalPathException e) {
+    } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
     }