You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/22 09:31:27 UTC

[iotdb] branch rel/1.2 updated: [To rel/1.2][IOTDB-6143] Pipe: Support PipeConnector subtasks with the same parameters concurrently scheduling (#11185)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new ab2e107b246 [To rel/1.2][IOTDB-6143] Pipe: Support PipeConnector subtasks with the same parameters concurrently scheduling (#11185)
ab2e107b246 is described below

commit ab2e107b246686949c2313fbc914f256e2392042
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Fri Sep 22 17:31:21 2023 +0800

    [To rel/1.2][IOTDB-6143] Pipe: Support PipeConnector subtasks with the same parameters concurrently scheduling (#11185)
    
    * [IOTDB-6143] Pipe: Support PipeConnector subtasks with the same parameters concurrently scheduling (#11083)
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
    
    (cherry picked from commit 4d7aaeb8c9c46b5640d8b66b1193bc23c2a569db)
    
    * remove airgapConnector entrance
    
    * remove IT
---
 .../config/constant/PipeConnectorConstant.java     |   6 +
 .../connector/protocol/opcua/OpcUaConnector.java   |  72 ++++++++---
 .../protocol/websocket/WebSocketConnector.java     |  56 +++++++--
 .../connector/PipeConnectorSubtaskManager.java     | 132 ++++++++++++++-------
 4 files changed, 194 insertions(+), 72 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 8135ae577ad..3414b7a9b29 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.config.constant;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
 
 public class PipeConnectorConstant {
@@ -29,6 +31,10 @@ public class PipeConnectorConstant {
   public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
   public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = "connector.node-urls";
 
+  public static final String CONNECTOR_IOTDB_PARALLEL_TASKS_KEY = "connector.parallel.tasks";
+  public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
+      PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
+
   public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = "connector.batch.enable";
   public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE = true;
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index 703e4652d99..bc86efb6ec1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -28,9 +28,11 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
@@ -43,7 +45,10 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
 import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
@@ -63,6 +68,10 @@ public class OpcUaConnector implements PipeConnector {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaConnector.class);
 
+  private static final Map<String, Pair<AtomicInteger, OpcUaServer>>
+      SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
+
+  private String serverKey;
   private OpcUaServer server;
 
   @Override
@@ -86,14 +95,31 @@ public class OpcUaConnector implements PipeConnector {
         parameters.getStringOrDefault(
             CONNECTOR_IOTDB_PASSWORD_KEY, CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
 
-    server =
-        new OpcUaServerBuilder()
-            .setTcpBindPort(tcpBindPort)
-            .setHttpsBindPort(httpsBindPort)
-            .setUser(user)
-            .setPassword(password)
-            .build();
-    server.startup();
+    synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+      serverKey = httpsBindPort + ":" + tcpBindPort;
+
+      server =
+          SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP
+              .computeIfAbsent(
+                  serverKey,
+                  key -> {
+                    try {
+                      final OpcUaServer newServer =
+                          new OpcUaServerBuilder()
+                              .setTcpBindPort(tcpBindPort)
+                              .setHttpsBindPort(httpsBindPort)
+                              .setUser(user)
+                              .setPassword(password)
+                              .build();
+                      newServer.startup();
+                      return new Pair<>(new AtomicInteger(0), newServer);
+                    } catch (Exception e) {
+                      throw new PipeException("Failed to build and startup OpcUaServer", e);
+                    }
+                  })
+              .getRight();
+      SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey).getLeft().incrementAndGet();
+    }
   }
 
   @Override
@@ -106,6 +132,11 @@ public class OpcUaConnector implements PipeConnector {
     // Server side, do nothing
   }
 
+  @Override
+  public void transfer(Event event) throws Exception {
+    // Do nothing when receive heartbeat or other events
+  }
+
   @Override
   public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
     // PipeProcessor can change the type of TabletInsertionEvent
@@ -234,13 +265,26 @@ public class OpcUaConnector implements PipeConnector {
     }
   }
 
-  @Override
-  public void transfer(Event event) throws Exception {
-    // Do nothing when receive heartbeat or other events
-  }
-
   @Override
   public void close() throws Exception {
-    server.shutdown();
+    if (serverKey == null) {
+      return;
+    }
+
+    synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+      final Pair<AtomicInteger, OpcUaServer> pair =
+          SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey);
+      if (pair == null) {
+        return;
+      }
+
+      if (pair.getLeft().decrementAndGet() <= 0) {
+        try {
+          pair.getRight().shutdown();
+        } finally {
+          SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(serverKey);
+        }
+      }
+    }
   }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 879cd9e2423..37b12170582 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -39,15 +39,21 @@ import javax.annotation.Nullable;
 
 import java.net.InetSocketAddress;
 import java.util.Comparator;
+import java.util.Map;
 import java.util.Optional;
 import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class WebSocketConnector implements PipeConnector {
   private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConnector.class);
-  private final AtomicReference<WebSocketConnectorServer> server = new AtomicReference<>();
-  private int port;
+
+  private static final Map<Integer, Pair<AtomicInteger, WebSocketConnectorServer>>
+      PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
+
+  private Integer port;
+  private WebSocketConnectorServer server;
 
   public final AtomicLong commitIdGenerator = new AtomicLong(0);
   private final AtomicLong lastCommitId = new AtomicLong(0);
@@ -68,13 +74,19 @@ public class WebSocketConnector implements PipeConnector {
 
   @Override
   public void handshake() throws Exception {
-    if (server.get() == null) {
-      synchronized (server) {
-        if (server.get() == null) {
-          server.set(new WebSocketConnectorServer(new InetSocketAddress(port), this));
-          server.get().start();
-        }
-      }
+    synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+      server =
+          PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP
+              .computeIfAbsent(
+                  port,
+                  key -> {
+                    final WebSocketConnectorServer newServer =
+                        new WebSocketConnectorServer(new InetSocketAddress(port), this);
+                    newServer.start();
+                    return new Pair<>(new AtomicInteger(0), newServer);
+                  })
+              .getRight();
+      PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port).getLeft().incrementAndGet();
     }
   }
 
@@ -94,7 +106,7 @@ public class WebSocketConnector implements PipeConnector {
     long commitId = commitIdGenerator.incrementAndGet();
     ((EnrichedEvent) tabletInsertionEvent)
         .increaseReferenceCount(WebSocketConnector.class.getName());
-    server.get().addEvent(new Pair<>(commitId, tabletInsertionEvent));
+    server.addEvent(new Pair<>(commitId, tabletInsertionEvent));
   }
 
   @Override
@@ -108,7 +120,7 @@ public class WebSocketConnector implements PipeConnector {
     for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
       long commitId = commitIdGenerator.incrementAndGet();
       ((EnrichedEvent) event).increaseReferenceCount(WebSocketConnector.class.getName());
-      server.get().addEvent(new Pair<>(commitId, event));
+      server.addEvent(new Pair<>(commitId, event));
     }
   }
 
@@ -117,7 +129,25 @@ public class WebSocketConnector implements PipeConnector {
 
   @Override
   public void close() throws Exception {
-    server.get().stop();
+    if (port == null) {
+      return;
+    }
+
+    synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+      final Pair<AtomicInteger, WebSocketConnectorServer> pair =
+          PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port);
+      if (pair == null) {
+        return;
+      }
+
+      if (pair.getLeft().decrementAndGet() <= 0) {
+        try {
+          pair.getRight().stop();
+        } finally {
+          PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(port);
+        }
+      }
+    }
   }
 
   public synchronized void commit(long requestCommitId, @Nullable EnrichedEvent enrichedEvent) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 9fde95da3fc..a6c0c59dac5 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -38,16 +38,22 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.function.Supplier;
 
 public class PipeConnectorSubtaskManager {
 
+  private static final Map<String, Supplier<PipeConnector>> CONNECTOR_CONSTRUCTORS =
+      new HashMap<>();
+
   private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE =
       "Failed to deregister PipeConnectorSubtask. No such subtask: ";
 
-  private final Map<String, PipeConnectorSubtaskLifeCycle>
+  private final Map<String, List<PipeConnectorSubtaskLifeCycle>>
       attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
 
   public synchronized String register(
@@ -58,55 +64,62 @@ public class PipeConnectorSubtaskManager {
         new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
 
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
-      // 1. Construct, validate and customize PipeConnector, and then handshake (create connection)
-      // with the target
+      final int connectorNum =
+          pipeConnectorParameters.getIntOrDefault(
+              PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+              PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+      final List<PipeConnectorSubtaskLifeCycle> pipeConnectorSubtaskLifeCycleList =
+          new ArrayList<>(connectorNum);
+
       final String connectorKey =
           pipeConnectorParameters.getStringOrDefault(
               PipeConnectorConstant.CONNECTOR_KEY,
               BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
-
-      PipeConnector pipeConnector;
-      if (connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
-          || connectorKey.equals(
-              BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName())) {
-        pipeConnector = new IoTDBThriftSyncConnector();
-      } else if (connectorKey.equals(
-          BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName())) {
-        pipeConnector = new IoTDBThriftAsyncConnector();
-      } else if (connectorKey.equals(
-          BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName())) {
-        pipeConnector = new IoTDBLegacyPipeConnector();
-      } else if (connectorKey.equals(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName())) {
-        pipeConnector = new OpcUaConnector();
-      } else if (connectorKey.equals(BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName())) {
-        pipeConnector = new WebSocketConnector();
-      } else {
-        pipeConnector = PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
-      }
-
-      try {
-        pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters));
-        pipeConnector.customize(
-            pipeConnectorParameters, new PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
-        pipeConnector.handshake();
-      } catch (Exception e) {
-        throw new PipeException(
-            "Failed to construct PipeConnector, because of " + e.getMessage(), e);
-      }
-
-      // 2. Construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle
+      // Shared pending queue for all subtasks
       final BoundedBlockingPendingQueue<Event> pendingQueue =
           new BoundedBlockingPendingQueue<>(
               PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
-      final PipeConnectorSubtask pipeConnectorSubtask =
-          new PipeConnectorSubtask(attributeSortedString, pendingQueue, pipeConnector);
-      final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
-          new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, pendingQueue);
+
+      for (int i = 0; i < connectorNum; i++) {
+        final PipeConnector pipeConnector =
+            CONNECTOR_CONSTRUCTORS
+                .getOrDefault(
+                    connectorKey,
+                    () -> PipeAgent.plugin().reflectConnector(pipeConnectorParameters))
+                .get();
+
+        // 1. Construct, validate and customize PipeConnector, and then handshake (create
+        // connection) with the target
+        try {
+          pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters));
+          pipeConnector.customize(
+              pipeConnectorParameters, new PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
+          pipeConnector.handshake();
+        } catch (Exception e) {
+          throw new PipeException(
+              "Failed to construct PipeConnector, because of " + e.getMessage(), e);
+        }
+
+        // 2. Construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle
+        final PipeConnectorSubtask pipeConnectorSubtask =
+            new PipeConnectorSubtask(
+                String.format(
+                    "%s_%s_%s", attributeSortedString, pipeRuntimeEnvironment.getCreationTime(), i),
+                pendingQueue,
+                pipeConnector);
+        final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
+            new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, pendingQueue);
+        pipeConnectorSubtaskLifeCycleList.add(pipeConnectorSubtaskLifeCycle);
+      }
+
       attributeSortedString2SubtaskLifeCycleMap.put(
-          attributeSortedString, pipeConnectorSubtaskLifeCycle);
+          attributeSortedString, pipeConnectorSubtaskLifeCycleList);
     }
 
-    attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).register();
+    for (final PipeConnectorSubtaskLifeCycle lifeCycle :
+        attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
+      lifeCycle.register();
+    }
 
     return attributeSortedString;
   }
@@ -116,7 +129,11 @@ public class PipeConnectorSubtaskManager {
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString);
     }
 
-    if (attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).deregister()) {
+    final List<PipeConnectorSubtaskLifeCycle> lifeCycles =
+        attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
+    lifeCycles.removeIf(PipeConnectorSubtaskLifeCycle::deregister);
+
+    if (lifeCycles.isEmpty()) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
     }
   }
@@ -126,7 +143,10 @@ public class PipeConnectorSubtaskManager {
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString);
     }
 
-    attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).start();
+    for (final PipeConnectorSubtaskLifeCycle lifeCycle :
+        attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
+      lifeCycle.start();
+    }
   }
 
   public synchronized void stop(String attributeSortedString) {
@@ -134,7 +154,10 @@ public class PipeConnectorSubtaskManager {
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString);
     }
 
-    attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop();
+    for (final PipeConnectorSubtaskLifeCycle lifeCycle :
+        attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
+      lifeCycle.stop();
+    }
   }
 
   public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
@@ -144,13 +167,32 @@ public class PipeConnectorSubtaskManager {
           "Failed to get PendingQueue. No such subtask: " + attributeSortedString);
     }
 
-    return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getPendingQueue();
+    // All subtasks share the same pending queue
+    return attributeSortedString2SubtaskLifeCycleMap
+        .get(attributeSortedString)
+        .get(0)
+        .getPendingQueue();
   }
 
   /////////////////////////  Singleton Instance Holder  /////////////////////////
 
   private PipeConnectorSubtaskManager() {
-    // Empty constructor
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName(),
+        IoTDBThriftSyncConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName(),
+        IoTDBThriftSyncConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
+        IoTDBThriftAsyncConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
+        IoTDBLegacyPipeConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaConnector::new);
   }
 
   private static class PipeSubtaskManagerHolder {