You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/22 09:31:27 UTC
[iotdb] branch rel/1.2 updated: [To rel/1.2][IOTDB-6143] Pipe: Support PipeConnector subtasks with the same parameters concurrently scheduling (#11185)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new ab2e107b246 [To rel/1.2][IOTDB-6143] Pipe: Support PipeConnector subtasks with the same parameters concurrently scheduling (#11185)
ab2e107b246 is described below
commit ab2e107b246686949c2313fbc914f256e2392042
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Fri Sep 22 17:31:21 2023 +0800
[To rel/1.2][IOTDB-6143] Pipe: Support PipeConnector subtasks with the same parameters concurrently scheduling (#11185)
* [IOTDB-6143] Pipe: Support PipeConnector subtasks with the same parameters concurrently scheduling (#11083)
Co-authored-by: Steve Yurong Su <ro...@apache.org>
(cherry picked from commit 4d7aaeb8c9c46b5640d8b66b1193bc23c2a569db)
* remove airgapConnector entrance
* remove IT
---
.../config/constant/PipeConnectorConstant.java | 6 +
.../connector/protocol/opcua/OpcUaConnector.java | 72 ++++++++---
.../protocol/websocket/WebSocketConnector.java | 56 +++++++--
.../connector/PipeConnectorSubtaskManager.java | 132 ++++++++++++++-------
4 files changed, 194 insertions(+), 72 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 8135ae577ad..3414b7a9b29 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.config.constant;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
public class PipeConnectorConstant {
@@ -29,6 +31,10 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = "connector.node-urls";
+ public static final String CONNECTOR_IOTDB_PARALLEL_TASKS_KEY = "connector.parallel.tasks";
+ public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
+ PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
+
public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = "connector.batch.enable";
public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE = true;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index 703e4652d99..bc86efb6ec1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -28,9 +28,11 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
@@ -43,7 +45,10 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
@@ -63,6 +68,10 @@ public class OpcUaConnector implements PipeConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaConnector.class);
+ private static final Map<String, Pair<AtomicInteger, OpcUaServer>>
+ SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
+
+ private String serverKey;
private OpcUaServer server;
@Override
@@ -86,14 +95,31 @@ public class OpcUaConnector implements PipeConnector {
parameters.getStringOrDefault(
CONNECTOR_IOTDB_PASSWORD_KEY, CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
- server =
- new OpcUaServerBuilder()
- .setTcpBindPort(tcpBindPort)
- .setHttpsBindPort(httpsBindPort)
- .setUser(user)
- .setPassword(password)
- .build();
- server.startup();
+ synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+ serverKey = httpsBindPort + ":" + tcpBindPort;
+
+ server =
+ SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP
+ .computeIfAbsent(
+ serverKey,
+ key -> {
+ try {
+ final OpcUaServer newServer =
+ new OpcUaServerBuilder()
+ .setTcpBindPort(tcpBindPort)
+ .setHttpsBindPort(httpsBindPort)
+ .setUser(user)
+ .setPassword(password)
+ .build();
+ newServer.startup();
+ return new Pair<>(new AtomicInteger(0), newServer);
+ } catch (Exception e) {
+ throw new PipeException("Failed to build and startup OpcUaServer", e);
+ }
+ })
+ .getRight();
+ SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey).getLeft().incrementAndGet();
+ }
}
@Override
@@ -106,6 +132,11 @@ public class OpcUaConnector implements PipeConnector {
// Server side, do nothing
}
+ @Override
+ public void transfer(Event event) throws Exception {
+ // Do nothing when receive heartbeat or other events
+ }
+
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
// PipeProcessor can change the type of TabletInsertionEvent
@@ -234,13 +265,26 @@ public class OpcUaConnector implements PipeConnector {
}
}
- @Override
- public void transfer(Event event) throws Exception {
- // Do nothing when receive heartbeat or other events
- }
-
@Override
public void close() throws Exception {
- server.shutdown();
+ if (serverKey == null) {
+ return;
+ }
+
+ synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+ final Pair<AtomicInteger, OpcUaServer> pair =
+ SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(serverKey);
+ if (pair == null) {
+ return;
+ }
+
+ if (pair.getLeft().decrementAndGet() <= 0) {
+ try {
+ pair.getRight().shutdown();
+ } finally {
+ SERVER_KEY_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(serverKey);
+ }
+ }
+ }
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 879cd9e2423..37b12170582 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -39,15 +39,21 @@ import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.util.Comparator;
+import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
public class WebSocketConnector implements PipeConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConnector.class);
- private final AtomicReference<WebSocketConnectorServer> server = new AtomicReference<>();
- private int port;
+
+ private static final Map<Integer, Pair<AtomicInteger, WebSocketConnectorServer>>
+ PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
+
+ private Integer port;
+ private WebSocketConnectorServer server;
public final AtomicLong commitIdGenerator = new AtomicLong(0);
private final AtomicLong lastCommitId = new AtomicLong(0);
@@ -68,13 +74,19 @@ public class WebSocketConnector implements PipeConnector {
@Override
public void handshake() throws Exception {
- if (server.get() == null) {
- synchronized (server) {
- if (server.get() == null) {
- server.set(new WebSocketConnectorServer(new InetSocketAddress(port), this));
- server.get().start();
- }
- }
+ synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+ server =
+ PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP
+ .computeIfAbsent(
+ port,
+ key -> {
+ final WebSocketConnectorServer newServer =
+ new WebSocketConnectorServer(new InetSocketAddress(port), this);
+ newServer.start();
+ return new Pair<>(new AtomicInteger(0), newServer);
+ })
+ .getRight();
+ PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port).getLeft().incrementAndGet();
}
}
@@ -94,7 +106,7 @@ public class WebSocketConnector implements PipeConnector {
long commitId = commitIdGenerator.incrementAndGet();
((EnrichedEvent) tabletInsertionEvent)
.increaseReferenceCount(WebSocketConnector.class.getName());
- server.get().addEvent(new Pair<>(commitId, tabletInsertionEvent));
+ server.addEvent(new Pair<>(commitId, tabletInsertionEvent));
}
@Override
@@ -108,7 +120,7 @@ public class WebSocketConnector implements PipeConnector {
for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
long commitId = commitIdGenerator.incrementAndGet();
((EnrichedEvent) event).increaseReferenceCount(WebSocketConnector.class.getName());
- server.get().addEvent(new Pair<>(commitId, event));
+ server.addEvent(new Pair<>(commitId, event));
}
}
@@ -117,7 +129,25 @@ public class WebSocketConnector implements PipeConnector {
@Override
public void close() throws Exception {
- server.get().stop();
+ if (port == null) {
+ return;
+ }
+
+ synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
+ final Pair<AtomicInteger, WebSocketConnectorServer> pair =
+ PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port);
+ if (pair == null) {
+ return;
+ }
+
+ if (pair.getLeft().decrementAndGet() <= 0) {
+ try {
+ pair.getRight().stop();
+ } finally {
+ PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(port);
+ }
+ }
+ }
}
public synchronized void commit(long requestCommitId, @Nullable EnrichedEvent enrichedEvent) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 9fde95da3fc..a6c0c59dac5 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -38,16 +38,22 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.function.Supplier;
public class PipeConnectorSubtaskManager {
+ private static final Map<String, Supplier<PipeConnector>> CONNECTOR_CONSTRUCTORS =
+ new HashMap<>();
+
private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE =
"Failed to deregister PipeConnectorSubtask. No such subtask: ";
- private final Map<String, PipeConnectorSubtaskLifeCycle>
+ private final Map<String, List<PipeConnectorSubtaskLifeCycle>>
attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
public synchronized String register(
@@ -58,55 +64,62 @@ public class PipeConnectorSubtaskManager {
new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
- // 1. Construct, validate and customize PipeConnector, and then handshake (create connection)
- // with the target
+ final int connectorNum =
+ pipeConnectorParameters.getIntOrDefault(
+ PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
+ PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+ final List<PipeConnectorSubtaskLifeCycle> pipeConnectorSubtaskLifeCycleList =
+ new ArrayList<>(connectorNum);
+
final String connectorKey =
pipeConnectorParameters.getStringOrDefault(
PipeConnectorConstant.CONNECTOR_KEY,
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName());
-
- PipeConnector pipeConnector;
- if (connectorKey.equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
- || connectorKey.equals(
- BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName())) {
- pipeConnector = new IoTDBThriftSyncConnector();
- } else if (connectorKey.equals(
- BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName())) {
- pipeConnector = new IoTDBThriftAsyncConnector();
- } else if (connectorKey.equals(
- BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName())) {
- pipeConnector = new IoTDBLegacyPipeConnector();
- } else if (connectorKey.equals(BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName())) {
- pipeConnector = new OpcUaConnector();
- } else if (connectorKey.equals(BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName())) {
- pipeConnector = new WebSocketConnector();
- } else {
- pipeConnector = PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
- }
-
- try {
- pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters));
- pipeConnector.customize(
- pipeConnectorParameters, new PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
- pipeConnector.handshake();
- } catch (Exception e) {
- throw new PipeException(
- "Failed to construct PipeConnector, because of " + e.getMessage(), e);
- }
-
- // 2. Construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle
+ // Shared pending queue for all subtasks
final BoundedBlockingPendingQueue<Event> pendingQueue =
new BoundedBlockingPendingQueue<>(
PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
- final PipeConnectorSubtask pipeConnectorSubtask =
- new PipeConnectorSubtask(attributeSortedString, pendingQueue, pipeConnector);
- final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
- new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, pendingQueue);
+
+ for (int i = 0; i < connectorNum; i++) {
+ final PipeConnector pipeConnector =
+ CONNECTOR_CONSTRUCTORS
+ .getOrDefault(
+ connectorKey,
+ () -> PipeAgent.plugin().reflectConnector(pipeConnectorParameters))
+ .get();
+
+ // 1. Construct, validate and customize PipeConnector, and then handshake (create
+ // connection) with the target
+ try {
+ pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters));
+ pipeConnector.customize(
+ pipeConnectorParameters, new PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
+ pipeConnector.handshake();
+ } catch (Exception e) {
+ throw new PipeException(
+ "Failed to construct PipeConnector, because of " + e.getMessage(), e);
+ }
+
+ // 2. Construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle
+ final PipeConnectorSubtask pipeConnectorSubtask =
+ new PipeConnectorSubtask(
+ String.format(
+ "%s_%s_%s", attributeSortedString, pipeRuntimeEnvironment.getCreationTime(), i),
+ pendingQueue,
+ pipeConnector);
+ final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
+ new PipeConnectorSubtaskLifeCycle(executor, pipeConnectorSubtask, pendingQueue);
+ pipeConnectorSubtaskLifeCycleList.add(pipeConnectorSubtaskLifeCycle);
+ }
+
attributeSortedString2SubtaskLifeCycleMap.put(
- attributeSortedString, pipeConnectorSubtaskLifeCycle);
+ attributeSortedString, pipeConnectorSubtaskLifeCycleList);
}
- attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).register();
+ for (final PipeConnectorSubtaskLifeCycle lifeCycle :
+ attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
+ lifeCycle.register();
+ }
return attributeSortedString;
}
@@ -116,7 +129,11 @@ public class PipeConnectorSubtaskManager {
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString);
}
- if (attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).deregister()) {
+ final List<PipeConnectorSubtaskLifeCycle> lifeCycles =
+ attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
+ lifeCycles.removeIf(PipeConnectorSubtaskLifeCycle::deregister);
+
+ if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
}
}
@@ -126,7 +143,10 @@ public class PipeConnectorSubtaskManager {
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString);
}
- attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).start();
+ for (final PipeConnectorSubtaskLifeCycle lifeCycle :
+ attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
+ lifeCycle.start();
+ }
}
public synchronized void stop(String attributeSortedString) {
@@ -134,7 +154,10 @@ public class PipeConnectorSubtaskManager {
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString);
}
- attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop();
+ for (final PipeConnectorSubtaskLifeCycle lifeCycle :
+ attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString)) {
+ lifeCycle.stop();
+ }
}
public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
@@ -144,13 +167,32 @@ public class PipeConnectorSubtaskManager {
"Failed to get PendingQueue. No such subtask: " + attributeSortedString);
}
- return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getPendingQueue();
+ // All subtasks share the same pending queue
+ return attributeSortedString2SubtaskLifeCycleMap
+ .get(attributeSortedString)
+ .get(0)
+ .getPendingQueue();
}
///////////////////////// Singleton Instance Holder /////////////////////////
private PipeConnectorSubtaskManager() {
- // Empty constructor
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName(),
+ IoTDBThriftSyncConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName(),
+ IoTDBThriftSyncConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName(),
+ IoTDBThriftAsyncConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(),
+ IoTDBLegacyPipeConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), WebSocketConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), OpcUaConnector::new);
}
private static class PipeSubtaskManagerHolder {