You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/08 05:51:29 UTC
[iotdb] 01/01: Construct SinkHandle in LocalExecutionPlanner and remove the IOExecption in next and hasNext method of Operator
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 135322cd49156261205c453928518cc51b0173c0
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Apr 8 13:51:07 2022 +0800
Construct SinkHandle in LocalExecutionPlanner and remove the IOExecption in next and hasNext method of Operator
---
.../db/mpp/execution/FragmentInstanceManager.java | 47 ++++++++++++------
.../org/apache/iotdb/db/mpp/operator/Operator.java | 4 +-
.../db/mpp/operator/process/LimitOperator.java | 4 +-
.../db/mpp/operator/process/TimeJoinOperator.java | 5 +-
.../db/mpp/operator/source/SeriesScanOperator.java | 56 ++++++++++++----------
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 35 ++++++++++++--
.../iotdb/db/mpp/operator/LimitOperatorTest.java | 2 +-
.../db/mpp/operator/SeriesScanOperatorTest.java | 2 +-
.../db/mpp/operator/TimeJoinOperatorTest.java | 2 +-
9 files changed, 102 insertions(+), 55 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
index 5c68f772e2..6ba68e55a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -93,16 +93,21 @@ public class FragmentInstanceManager {
instanceId,
fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
- DataDriver driver =
- planner.plan(
- instance.getFragment().getRoot(),
- context,
- instance.getTimeFilter(),
- dataRegion);
- return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+ try {
+ DataDriver driver =
+ planner.plan(
+ instance.getFragment().getRoot(),
+ context,
+ instance.getTimeFilter(),
+ dataRegion);
+ return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+ } catch (Throwable t) {
+ context.failed(t);
+ return null;
+ }
});
- return execution.getInstanceInfo();
+ return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
}
public FragmentInstanceInfo execSchemaQueryFragmentInstance(
@@ -121,12 +126,16 @@ public class FragmentInstanceManager {
instanceId,
fragmentInstanceId -> new FragmentInstanceContext(fragmentInstanceId, state));
- SchemaDriver driver =
- planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
- return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+ try {
+ SchemaDriver driver =
+ planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
+ return new FragmentInstanceExecution(scheduler, instanceId, context, driver, state);
+ } catch (Throwable t) {
+ context.failed(t);
+ return null;
+ }
});
-
- return execution.getInstanceInfo();
+ return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
}
public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
@@ -154,14 +163,22 @@ public class FragmentInstanceManager {
return execution.getInstanceInfo();
}
+ private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) {
+ return new FragmentInstanceInfo(
+ FragmentInstanceState.FAILED, instanceContext.get(instanceId).getEndTime());
+ }
+
private void removeOldTasks() {
long oldestAllowedInstance = System.currentTimeMillis() - infoCacheTime.toMillis();
- instanceExecution
+ instanceContext
.entrySet()
.removeIf(
entry -> {
FragmentInstanceId instanceId = entry.getKey();
- FragmentInstanceExecution execution = entry.getValue();
+ FragmentInstanceExecution execution = instanceExecution.get(instanceId);
+ if (execution == null) {
+ return true;
+ }
long endTime = execution.getInstanceInfo().getEndTime();
if (endTime != -1 && endTime <= oldestAllowedInstance) {
instanceContext.remove(instanceId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index af18fb2ba0..c8cccc0520 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -41,10 +41,10 @@ public interface Operator extends AutoCloseable {
}
/** Gets next tsBlock from this operator. If no data is currently available, return null. */
- TsBlock next() throws IOException;
+ TsBlock next();
/** @return true if the operator has more data, otherwise false */
- boolean hasNext() throws IOException;
+ boolean hasNext();
/** This method will always be called before releasing the Operator reference. */
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index ef5466854f..a226637146 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -53,7 +53,7 @@ public class LimitOperator implements ProcessOperator {
}
@Override
- public TsBlock next() throws IOException {
+ public TsBlock next() {
TsBlock block = child.next();
TsBlock res = block;
if (block.getPositionCount() <= remainingLimit) {
@@ -66,7 +66,7 @@ public class LimitOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() throws IOException {
+ public boolean hasNext() {
return remainingLimit > 0 && child.hasNext();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index e38a150dff..f58af90e77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
-import java.io.IOException;
import java.util.List;
public class TimeJoinOperator implements ProcessOperator {
@@ -96,7 +95,7 @@ public class TimeJoinOperator implements ProcessOperator {
}
@Override
- public TsBlock next() throws IOException {
+ public TsBlock next() {
// end time for returned TsBlock this time, it's the min end time among all the children
// TsBlocks
long currentEndTime = 0;
@@ -154,7 +153,7 @@ public class TimeJoinOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() throws IOException {
+ public boolean hasNext() {
if (finished) {
return false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index acb40baeaa..e979710c3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -63,47 +63,51 @@ public class SeriesScanOperator implements SourceOperator {
}
@Override
- public TsBlock next() throws IOException {
+ public TsBlock next() {
if (hasCachedTsBlock || hasNext()) {
hasCachedTsBlock = false;
return tsBlock;
}
- throw new IOException("no next batch");
+ throw new IllegalStateException("no next batch");
}
@Override
- public boolean hasNext() throws IOException {
+ public boolean hasNext() {
- if (hasCachedTsBlock) {
- return true;
- }
-
- /*
- * consume page data firstly
- */
- if (readPageData()) {
- hasCachedTsBlock = true;
- return true;
- }
+ try {
+ if (hasCachedTsBlock) {
+ return true;
+ }
- /*
- * consume chunk data secondly
- */
- if (readChunkData()) {
- hasCachedTsBlock = true;
- return true;
- }
+ /*
+ * consume page data firstly
+ */
+ if (readPageData()) {
+ hasCachedTsBlock = true;
+ return true;
+ }
- /*
- * consume next file finally
- */
- while (seriesScanUtil.hasNextFile()) {
+ /*
+ * consume chunk data secondly
+ */
if (readChunkData()) {
hasCachedTsBlock = true;
return true;
}
+
+ /*
+ * consume next file finally
+ */
+ while (seriesScanUtil.hasNextFile()) {
+ if (readChunkData()) {
+ hasCachedTsBlock = true;
+ return true;
+ }
+ }
+ return hasCachedTsBlock;
+ } catch (IOException e) {
+ throw new RuntimeException("Error happened while scanning the file", e);
}
- return hasCachedTsBlock;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index ece32553b0..1af476b7a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -18,10 +18,14 @@
*/
package org.apache.iotdb.db.mpp.sql.planner;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
+import org.apache.iotdb.db.mpp.buffer.DataBlockManager;
+import org.apache.iotdb.db.mpp.buffer.DataBlockService;
import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
import org.apache.iotdb.db.mpp.execution.DataDriver;
import org.apache.iotdb.db.mpp.execution.DataDriverContext;
@@ -51,8 +55,10 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -67,6 +73,9 @@ import static java.util.Objects.requireNonNull;
*/
public class LocalExecutionPlanner {
+ private static final DataBlockManager DATA_BLOCK_MANAGER =
+ DataBlockService.getInstance().getDataBlockManager();
+
public static LocalExecutionPlanner getInstance() {
return InstanceHolder.INSTANCE;
}
@@ -212,6 +221,7 @@ public class LocalExecutionPlanner {
@Override
public Operator visitExchange(ExchangeNode node, LocalExecutionPlanContext context) {
+
// TODO(jackie tien) create SourceHandle here
return super.visitExchange(node, context);
}
@@ -219,10 +229,27 @@ public class LocalExecutionPlanner {
@Override
public Operator visitFragmentSink(FragmentSinkNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
- // TODO(jackie tien) create SinkHandle here
- ISinkHandle sinkHandle = null;
- context.setSinkHandle(sinkHandle);
- return child;
+ Endpoint target = node.getDownStreamEndpoint();
+ FragmentInstanceId localInstanceId = context.instanceContext.getId();
+ FragmentInstanceId targetInstanceId = node.getDownStreamInstanceId();
+ try {
+ ISinkHandle sinkHandle =
+ DATA_BLOCK_MANAGER.createSinkHandle(
+ new TFragmentInstanceId(
+ localInstanceId.getQueryId().getId(),
+ String.valueOf(localInstanceId.getFragmentId().getId()),
+ localInstanceId.getInstanceId()),
+ target.getIp(),
+ new TFragmentInstanceId(
+ targetInstanceId.getQueryId().getId(),
+ String.valueOf(targetInstanceId.getFragmentId().getId()),
+ targetInstanceId.getInstanceId()),
+ node.getDownStreamPlanNodeId().getId());
+ context.setSinkHandle(sinkHandle);
+ return child;
+ } catch (IOException e) {
+ throw new RuntimeException("Error happened while creating sink handle", e);
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index f1e71d4086..f0c8e7e5c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -162,7 +162,7 @@ public class LimitOperatorTest {
count++;
}
assertEquals(13, count);
- } catch (IOException | IllegalPathException e) {
+ } catch (IllegalPathException e) {
e.printStackTrace();
fail();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index db5b568c3e..669e374bbd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -118,7 +118,7 @@ public class SeriesScanOperatorTest {
count++;
}
assertEquals(25, count);
- } catch (IOException | IllegalPathException e) {
+ } catch (IllegalPathException e) {
e.printStackTrace();
fail();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 2e49975d50..2289ae7f4e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -148,7 +148,7 @@ public class TimeJoinOperatorTest {
count++;
}
assertEquals(25, count);
- } catch (IOException | IllegalPathException e) {
+ } catch (IllegalPathException e) {
e.printStackTrace();
fail();
}