You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/04 10:27:46 UTC

[iotdb] branch FIDig updated: pipeline local source handle

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FIDig
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/FIDig by this push:
     new b42e8c4171 pipeline local source handle
b42e8c4171 is described below

commit b42e8c41712e98d5182841e0b3f9020cfa9a7539
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Apr 4 18:27:36 2023 +0800

    pipeline local source handle
---
 .../execution/exchange/MPPDataExchangeManager.java |  3 +-
 .../exchange/source/LocalSourceHandle.java         |  2 +-
 .../exchange/source/PipelineSourceHandle.java      | 39 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index b8a6bd9dee..7211534255 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.mpp.execution.exchange.sink.ShuffleSinkHandle;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.SinkChannel;
 import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
 import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.PipelineSourceHandle;
 import org.apache.iotdb.db.mpp.execution.exchange.source.SourceHandle;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
@@ -652,7 +653,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
    */
   public ISourceHandle createLocalSourceHandleForPipeline(
       SharedTsBlockQueue queue, DriverContext context) {
-    return new LocalSourceHandle(
+    return new PipelineSourceHandle(
         queue,
         new PipelineSourceHandleListenerImpl(context::failed),
         context.getDriverTaskID().toString());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
index 749b8b6274..025a59ed4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
@@ -68,7 +68,7 @@ public class LocalSourceHandle implements ISourceHandle {
   private TFragmentInstanceId localFragmentInstanceId;
   private String localPlanNodeId;
   private final SourceHandleListener sourceHandleListener;
-  private final SharedTsBlockQueue queue;
+  protected final SharedTsBlockQueue queue;
   private boolean aborted = false;
 
   private boolean closed = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/PipelineSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/PipelineSourceHandle.java
new file mode 100644
index 0000000000..e4b408cd9a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/PipelineSourceHandle.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.exchange.source;
+
+import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
+import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
+
+public class PipelineSourceHandle extends LocalSourceHandle {
+
+  public PipelineSourceHandle(
+      SharedTsBlockQueue queue,
+      MPPDataExchangeManager.SourceHandleListener sourceHandleListener,
+      String threadName) {
+    super(queue, sourceHandleListener, threadName);
+  }
+
+  @Override
+  public void setMaxBytesCanReserve(long maxBytesCanReserve) {
+    if (maxBytesCanReserve < queue.getMaxBytesCanReserve()) {
+      queue.setMaxBytesCanReserve(maxBytesCanReserve);
+    }
+  }
+}