You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/22 18:16:01 UTC
[iotdb] branch master updated: [IOTDB-5907] Pipe: pipe subtask scheduler (#9915)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f8516ed32d8 [IOTDB-5907] Pipe: pipe subtask scheduler (#9915)
f8516ed32d8 is described below
commit f8516ed32d89dd96211d9e5a1e5d458b4e16dec1
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue May 23 02:15:54 2023 +0800
[IOTDB-5907] Pipe: pipe subtask scheduler (#9915)
* remove deletion event and introduce generic event to pipe engine
* remove getType() method of Event
* executeForAWhile -> executeOnce
* safety start pipe tasks
* pipe task scheduler
* rename package
---
.../statemachine/ConfigRegionStateMachine.java | 6 +-
.../manager/pipe/runtime/PipeMetaSyncer.java | 21 ++++-
.../builtin/connector/DoNothingConnector.java | 4 +-
.../builtin/connector/IoTDBThriftConnector.java | 4 +-
.../builtin/processor/DoNothingProcessor.java | 11 ++-
.../org/apache/iotdb/pipe/api/PipeConnector.java | 10 +--
.../org/apache/iotdb/pipe/api/PipeProcessor.java | 10 +--
.../iotdb/pipe/api/collector/EventCollector.java | 39 ++--------
.../org/apache/iotdb/pipe/api/event/Event.java | 6 +-
.../pipe/api/event/dml/deletion/DeletionEvent.java | 48 ------------
.../event/dml/insertion/TabletInsertionEvent.java | 6 --
.../event/dml/insertion/TsFileInsertionEvent.java | 6 --
.../PipeRealtimeDataRegionHybridCollector.java | 49 ++++++------
.../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 6 +-
.../event/realtime/PipeRealtimeCollectEvent.java | 6 --
.../event/view/collector/PipeEventCollector.java | 19 +----
.../execution/executor/PipeSubtaskExecutor.java | 24 ++++--
.../execution/scheduler/PipeSubtaskScheduler.java | 89 ++++++++++++++++++++++
.../execution/scheduler/PipeTaskScheduler.java | 74 ------------------
.../db/pipe/task/subtask/PipeConnectorSubtask.java | 27 +++----
.../db/pipe/task/subtask/PipeProcessorSubtask.java | 26 +++----
.../iotdb/db/pipe/task/subtask/PipeSubtask.java | 34 +++++++--
.../core/collector/PipeRealtimeCollectTest.java | 20 +++--
23 files changed, 242 insertions(+), 303 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 53e7f9ae688..4964ba79774 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -64,7 +64,7 @@ public class ConfigRegionStateMachine
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRegionStateMachine.class);
private static final ExecutorService threadPool =
- IoTDBThreadPoolFactory.newCachedThreadPool("CQ-recovery");
+ IoTDBThreadPoolFactory.newCachedThreadPool("ConfigNode-Manager-Recovery");
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
private final ConfigPlanExecutor executor;
private ConfigManager configManager;
@@ -218,7 +218,9 @@ public class ConfigRegionStateMachine
// 2. For correctness: in cq recovery processing, it will use ConsensusManager which may be
// initialized after notifyLeaderChanged finished
threadPool.submit(() -> configManager.getCQManager().startCQScheduler());
- configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync();
+
+ threadPool.submit(
+ () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync());
} else {
LOGGER.info(
"Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}]",
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index ba9bba61c6a..4d6ebb04ff0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -40,7 +40,8 @@ public class PipeMetaSyncer {
private static final ScheduledExecutorService SYNC_EXECUTOR =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.PIPE_META_SYNC_SERVICE.getName());
- // TODO: make this configurable
+ // TODO: make these configurable
+ private static final long INITIAL_SYNC_DELAY_MINUTES = 3;
private static final long SYNC_INTERVAL_MINUTES = 3;
private final ConfigManager configManager;
@@ -52,15 +53,29 @@ public class PipeMetaSyncer {
}
public synchronized void start() {
+ while (configManager.getConsensusManager() == null) {
+ try {
+ LOGGER.info("consensus layer is not ready, sleep 1s...");
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("unexpected interruption during waiting for consensus layer ready.");
+ }
+ }
+
if (metaSyncFuture == null) {
metaSyncFuture =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- SYNC_EXECUTOR, this::sync, 0, SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES);
+ SYNC_EXECUTOR,
+ this::sync,
+ INITIAL_SYNC_DELAY_MINUTES,
+ SYNC_INTERVAL_MINUTES,
+ TimeUnit.MINUTES);
LOGGER.info("PipeMetaSyncer is started successfully.");
}
}
- private void sync() {
+ private synchronized void sync() {
final TSStatus status = configManager.getProcedureManager().pipeMetaSync();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
index 00f3b80424a..2522fdc66f6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -61,7 +61,7 @@ public class DoNothingConnector implements PipeConnector {
}
@Override
- public void transfer(DeletionEvent deletionEvent) {
+ public void transfer(Event event) {
// do nothing
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
index e252e5be726..82ddd05ba3c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -67,7 +67,7 @@ public class IoTDBThriftConnector implements PipeConnector {
}
@Override
- public void transfer(DeletionEvent deletionEvent) {
+ public void transfer(Event event) {
throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
index 6a18f9a64e8..bc56a8bb3cf 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -46,19 +46,18 @@ public class DoNothingProcessor implements PipeProcessor {
@Override
public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
throws IOException {
- eventCollector.collectTabletInsertionEvent(tabletInsertionEvent);
+ eventCollector.collect(tabletInsertionEvent);
}
@Override
public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
throws IOException {
- eventCollector.collectTsFileInsertionEvent(tsFileInsertionEvent);
+ eventCollector.collect(tsFileInsertionEvent);
}
@Override
- public void process(DeletionEvent deletionEvent, EventCollector eventCollector)
- throws IOException {
- eventCollector.collectDeletionEvent(deletionEvent);
+ public void process(Event event, EventCollector eventCollector) throws IOException {
+ eventCollector.collect(event);
}
@Override
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index 5502de8bb37..6d74847e763 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.pipe.api;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -52,7 +52,7 @@ import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
* following 3 methods will be called: {@link
* PipeConnector#transfer(TabletInsertionEvent)}, {@link
* PipeConnector#transfer(TsFileInsertionEvent)} and {@link
- * PipeConnector#transfer(DeletionEvent)}.
+ * PipeConnector#transfer(Event)}.
* </ul>
* <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
* PipeConnector#close() } method will be called.
@@ -130,11 +130,11 @@ public interface PipeConnector extends PipePlugin {
void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception;
/**
- * This method is used to transfer the DeletionEvent.
+ * This method is used to transfer the Event.
*
- * @param deletionEvent DeletionEvent to be transferred
+ * @param event Event to be transferred
* @throws PipeConnectionException if the connection is broken
* @throws Exception the user can throw errors if necessary
*/
- void transfer(DeletionEvent deletionEvent) throws Exception;
+ void transfer(Event event) throws Exception;
}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
index 16a5e81ba6c..e94384d5f23 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeProcessor.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -48,7 +48,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
* following 3 methods will be called: {@link
* PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link
* PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link
- * PipeProcessor#process(DeletionEvent, EventCollector)}.
+ * PipeProcessor#process(Event, EventCollector)}.
* <li>PipeConnector serializes the events into binaries and send them to sinks.
* </ul>
* <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
@@ -107,11 +107,11 @@ public interface PipeProcessor extends PipePlugin {
throws Exception;
/**
- * This method is called to process the DeletionEvent.
+ * This method is called to process the Event.
*
- * @param deletionEvent DeletionEvent to be processed
+ * @param event Event to be processed
* @param eventCollector used to collect result events after processing
* @throws Exception the user can throw errors if necessary
*/
- void process(DeletionEvent deletionEvent, EventCollector eventCollector) throws Exception;
+ void process(Event event, EventCollector eventCollector) throws Exception;
}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
index 2e53693d65b..a9ef1f0aa62 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/EventCollector.java
@@ -19,44 +19,21 @@
package org.apache.iotdb.pipe.api.collector;
-import org.apache.iotdb.pipe.api.PipeProcessor;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
import java.io.IOException;
-/**
- * Used to collect events generated by {@link PipeProcessor#process(TabletInsertionEvent,
- * EventCollector)}, {@link PipeProcessor#process(TsFileInsertionEvent, EventCollector)} or {@link
- * PipeProcessor#process(DeletionEvent, EventCollector)}.
- */
+/** Used to collect events in pipe engine. */
public interface EventCollector {
/**
- * Collects an insertion event in form of TabletInsertionEvent.
- *
- * @param event TabletInsertionEvent to be collected
- * @throws IOException if any I/O errors occur
- * @see TabletInsertionEvent
- */
- void collectTabletInsertionEvent(TabletInsertionEvent event) throws IOException;
-
- /**
- * Collects an insertion event in form of TsFileInsertionEvent.
- *
- * @param event TsFileInsertionEvent to be collected
- * @throws IOException if any I/O errors occur
- * @see TsFileInsertionEvent
- */
- void collectTsFileInsertionEvent(TsFileInsertionEvent event) throws IOException;
-
- /**
- * Collects a deletion event.
+ * Collects a Event in pipe engine.
*
- * @param event DeletionEvent to be collected
+ * @param event Event to be collected
* @throws IOException if any I/O errors occur
- * @see DeletionEvent
+ * @see Event
+ * @see org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent
+ * @see org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent
*/
- void collectDeletionEvent(DeletionEvent event) throws IOException;
+ void collect(Event event) throws IOException;
}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
index f5d8d2fbcd3..74ddf9e47d2 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/Event.java
@@ -20,8 +20,4 @@
package org.apache.iotdb.pipe.api.event;
/** This interface is used to abstract events in collaboration tasks. */
-public interface Event {
-
- /** @return the type of the event */
- EventType getType();
-}
+public interface Event {}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
deleted file mode 100644
index d1fb966379c..00000000000
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/deletion/DeletionEvent.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.pipe.api.event.dml.deletion;
-
-import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-
-/** DeletionEvent is used to define the event of deletion. */
-public interface DeletionEvent extends Event {
-
- /**
- * The method is used to get the path pattern of the deleted data.
- *
- * @return String
- */
- Path getPath();
-
- /**
- * The method is used to get the time range of the deleted data.
- *
- * @return TimeRange
- */
- TimeRange getTimeRange();
-
- @Override
- default EventType getType() {
- return EventType.DELETION;
- }
-}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index e353bf397e0..9fd6d89428c 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.pipe.api.event.dml.insertion;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import java.util.Iterator;
@@ -54,9 +53,4 @@ public interface TabletInsertionEvent extends Event {
* RowCollector
*/
TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer);
-
- @Override
- default EventType getType() {
- return EventType.TABLET_INSERTION;
- }
}
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
index 2d5badc8072..325f0683cb7 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TsFileInsertionEvent.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.pipe.api.event.dml.insertion;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
/**
* TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
@@ -42,9 +41,4 @@ public interface TsFileInsertionEvent extends Event {
* @return TsFileInsertionEvent
*/
TsFileInsertionEvent toTsFileInsertionEvent(Iterable<TabletInsertionEvent> iterable);
-
- @Override
- default EventType getType() {
- return EventType.TSFILE_INSERTION;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 67d461e8c8a..898b93cb22d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
import org.slf4j.Logger;
@@ -48,18 +50,17 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
@Override
public void collect(PipeRealtimeCollectEvent event) {
- switch (event.getEvent().getType()) {
- case TABLET_INSERTION:
- collectTabletInsertion(event);
- break;
- case TSFILE_INSERTION:
- collectTsFileInsertion(event);
- break;
- default:
- throw new UnsupportedOperationException(
- String.format(
- "Unsupported event type %s for Hybrid Realtime Collector %s",
- event.getEvent().getType(), this));
+ final Event eventToCollect = event.getEvent();
+
+ if (eventToCollect instanceof TabletInsertionEvent) {
+ collectTabletInsertion(event);
+ } else if (eventToCollect instanceof TsFileInsertionEvent) {
+ collectTsFileInsertion(event);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported event type %s for Hybrid Realtime Collector %s",
+ eventToCollect.getClass(), this));
}
}
@@ -107,18 +108,18 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
while (collectEvent != null) {
Event suppliedEvent;
- switch (collectEvent.getEvent().getType()) {
- case TABLET_INSERTION:
- suppliedEvent = supplyTabletInsertion(collectEvent);
- break;
- case TSFILE_INSERTION:
- suppliedEvent = supplyTsFileInsertion(collectEvent);
- break;
- default:
- throw new UnsupportedOperationException(
- String.format(
- "Unsupported event type %s for Hybrid Realtime Collector %s",
- collectEvent.getEvent().getType(), this));
+
+ // used to judge type of event, not directly for supplying.
+ final Event eventToSupply = collectEvent.getEvent();
+ if (eventToSupply instanceof TabletInsertionEvent) {
+ suppliedEvent = supplyTabletInsertion(collectEvent);
+ } else if (eventToSupply instanceof TsFileInsertionEvent) {
+ suppliedEvent = supplyTsFileInsertion(collectEvent);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported event type %s for Hybrid Realtime Collector %s",
+ eventToSupply.getClass(), this));
}
collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 32408769579..6b414c03b36 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -218,8 +218,8 @@ public class IoTDBThriftConnectorV1 implements PipeConnector {
}
@Override
- public void transfer(DeletionEvent deletionEvent) throws Exception {
- throw new NotImplementedException("Not implement for deletion event.");
+ public void transfer(Event event) {
+ LOGGER.warn("IoTDBThriftConnectorV1 does not support transfer generic event: {}.", event);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index 63fcc891361..0c63131e6e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
import java.util.Map;
@@ -55,11 +54,6 @@ public class PipeRealtimeCollectEvent implements Event, EnrichedEvent {
device2Measurements = null;
}
- @Override
- public EventType getType() {
- return event.getType();
- }
-
@Override
public boolean increaseReferenceCount(String holderMessage) {
return !(event instanceof EnrichedEvent)
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index a1f443a9d43..c38b7c28a67 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -23,9 +23,6 @@ import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.util.LinkedList;
import java.util.Queue;
@@ -48,21 +45,7 @@ public class PipeEventCollector implements EventCollector {
}
@Override
- public void collectTabletInsertionEvent(TabletInsertionEvent event) {
- collect(event);
- }
-
- @Override
- public void collectTsFileInsertionEvent(TsFileInsertionEvent event) {
- collect(event);
- }
-
- @Override
- public void collectDeletionEvent(DeletionEvent event) {
- collect(event);
- }
-
- private synchronized void collect(Event event) {
+ public synchronized void collect(Event event) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index 762561546b3..d1befd3e08a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.execution.executor;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -44,7 +45,8 @@ public abstract class PipeSubtaskExecutor {
private final Map<String, PipeSubtask> registeredIdSubtaskMapper;
- private int corePoolSize;
+ private final int corePoolSize;
+ private int runningSubtaskNumber;
protected PipeSubtaskExecutor(int corePoolSize, ThreadName threadName) {
subtaskWorkerThreadPoolExecutor =
@@ -54,6 +56,7 @@ public abstract class PipeSubtaskExecutor {
registeredIdSubtaskMapper = new ConcurrentHashMap<>();
this.corePoolSize = corePoolSize;
+ runningSubtaskNumber = 0;
}
/////////////////////// subtask management ///////////////////////
@@ -65,7 +68,10 @@ public abstract class PipeSubtaskExecutor {
}
registeredIdSubtaskMapper.put(subtask.getTaskID(), subtask);
- subtask.bindExecutors(subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor);
+ subtask.bindExecutors(
+ subtaskWorkerThreadPoolExecutor,
+ subtaskCallbackListeningExecutor,
+ new PipeSubtaskScheduler(this));
}
public final synchronized void start(String subTaskID) {
@@ -82,6 +88,7 @@ public abstract class PipeSubtaskExecutor {
} else {
subtask.allowSubmittingSelf();
subtask.submitSelf();
+ ++runningSubtaskNumber;
LOGGER.info("The subtask {} is started to submit self.", subTaskID);
}
}
@@ -92,7 +99,9 @@ public abstract class PipeSubtaskExecutor {
return;
}
- registeredIdSubtaskMapper.get(subTaskID).disallowSubmittingSelf();
+ if (registeredIdSubtaskMapper.get(subTaskID).disallowSubmittingSelf()) {
+ --runningSubtaskNumber;
+ }
}
public final synchronized void deregister(String subTaskID) {
@@ -138,12 +147,11 @@ public abstract class PipeSubtaskExecutor {
return subtaskWorkerThreadPoolExecutor.isShutdown();
}
- public final void adjustExecutorThreadNumber(int threadNum) {
- corePoolSize = threadNum;
- throw new UnsupportedOperationException("Not implemented yet.");
+ public final int getCorePoolSize() {
+ return corePoolSize;
}
- public final int getExecutorThreadNumber() {
- return corePoolSize;
+ public final int getRunningSubtaskNumber() {
+ return runningSubtaskNumber;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
new file mode 100644
index 00000000000..dada354e743
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.execution.scheduler;
+
+import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
+
+public class PipeSubtaskScheduler {
+
+ private final PipeSubtaskExecutor executor;
+
+ private boolean isFirstSchedule = true;
+
+ // TODO: make these two configurable
+
+ private static final int BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT = 10_000;
+ private int consumedEventCountCheckpointInterval;
+ private int consumedEventCount;
+
+ // in ms
+ private static final long BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION = 10 * 1000L;
+ private long timeDurationCheckpointInterval;
+ private long lastCheckTime;
+
+ public PipeSubtaskScheduler(PipeSubtaskExecutor executor) {
+ this.executor = executor;
+ }
+
+ public boolean schedule() {
+ if (isFirstSchedule) {
+ isFirstSchedule = false;
+
+ adjustCheckpointIntervalBasedOnExecutorStatus();
+
+ ++consumedEventCount;
+ return true;
+ }
+
+ if (consumedEventCount < consumedEventCountCheckpointInterval
+ && System.currentTimeMillis() - lastCheckTime < timeDurationCheckpointInterval) {
+ ++consumedEventCount;
+ return true;
+ }
+
+ return false;
+ }
+
+ private void adjustCheckpointIntervalBasedOnExecutorStatus() {
+ // 1. reset consumedEventCount and lastCheckTime
+ consumedEventCount = 0;
+ lastCheckTime = System.currentTimeMillis();
+
+ // 2. adjust checkpoint interval
+ final int corePoolSize = Math.max(1, executor.getCorePoolSize());
+ final int runningSubtaskNumber = Math.max(1, executor.getRunningSubtaskNumber());
+ consumedEventCountCheckpointInterval =
+ Math.max(
+ 1,
+ (int)
+ (((float) BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT / runningSubtaskNumber)
+ * corePoolSize));
+ timeDurationCheckpointInterval =
+ Math.max(
+ 1,
+ (long)
+ (((float) BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION / runningSubtaskNumber)
+ * corePoolSize));
+ }
+
+ public void reset() {
+ isFirstSchedule = true;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
deleted file mode 100644
index 4f035ca6715..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.execution.scheduler;
-
-import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
-
-/**
- * PipeTaskScheduler is a singleton class that manages the numbers of threads used by
- * PipeTaskExecutors dynamically.
- */
-public class PipeTaskScheduler {
-
- private final PipeSubtaskExecutorManager pipeSubtaskExecutorManager =
- PipeSubtaskExecutorManager.getInstance();
-
- public void adjustAssignerSubtaskExecutorThreadNum(int threadNum) {
- // TODO: make it configurable by setting different parameters
- pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
- }
-
- public int getAssignerSubtaskExecutorThreadNum() {
- return pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().getExecutorThreadNumber();
- }
-
- public void adjustConnectorSubtaskExecutorThreadNum(int threadNum) {
- // TODO: make it configurable by setting different parameters
- pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
- }
-
- public int getConnectorSubtaskExecutorThreadNum() {
- return pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().getExecutorThreadNumber();
- }
-
- public void adjustProcessorSubtaskExecutorThreadNum(int threadNum) {
- // TODO: make it configurable by setting different parameters
- pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().adjustExecutorThreadNumber(threadNum);
- }
-
- public int getProcessorSubtaskExecutorThreadNum() {
- return pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().getExecutorThreadNumber();
- }
-
- ///////////////////////// Singleton Instance Holder /////////////////////////
-
- private PipeTaskScheduler() {}
-
- private static class PipeTaskSchedulerHolder {
- private static PipeTaskScheduler instance = null;
- }
-
- public static PipeTaskScheduler setupAndGetInstance() {
- if (PipeTaskSchedulerHolder.instance == null) {
- PipeTaskSchedulerHolder.instance = new PipeTaskScheduler();
- }
- return PipeTaskSchedulerHolder.instance;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 80f366eb096..742b2230fa6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
@@ -50,9 +49,8 @@ public class PipeConnectorSubtask extends PipeSubtask {
this.outputPipeConnector = outputPipeConnector;
}
- // TODO: for a while
@Override
- protected synchronized void executeForAWhile() {
+ protected synchronized boolean executeOnce() {
try {
// TODO: reduce the frequency of heartbeat
outputPipeConnector.heartbeat();
@@ -65,23 +63,16 @@ public class PipeConnectorSubtask extends PipeSubtask {
// record this event for retrying on connection failure or other exceptions
lastEvent = event;
if (event == null) {
- return;
+ return false;
}
try {
- switch (event.getType()) {
- case TABLET_INSERTION:
- outputPipeConnector.transfer((TabletInsertionEvent) event);
- break;
- case TSFILE_INSERTION:
- outputPipeConnector.transfer((TsFileInsertionEvent) event);
- break;
- case DELETION:
- outputPipeConnector.transfer((DeletionEvent) event);
- break;
- default:
- throw new UnsupportedOperationException(
- "Unsupported event type: " + event.getClass().getName());
+ if (event instanceof TabletInsertionEvent) {
+ outputPipeConnector.transfer((TabletInsertionEvent) event);
+ } else if (event instanceof TsFileInsertionEvent) {
+ outputPipeConnector.transfer((TsFileInsertionEvent) event);
+ } else {
+ outputPipeConnector.transfer(event);
}
releaseLastEvent();
@@ -93,6 +84,8 @@ public class PipeConnectorSubtask extends PipeSubtask {
"Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.",
e);
}
+
+ return true;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index feb584fcaff..6a76beb02b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -51,28 +50,21 @@ public class PipeProcessorSubtask extends PipeSubtask {
}
@Override
- protected synchronized void executeForAWhile() throws Exception {
+ protected synchronized boolean executeOnce() throws Exception {
final Event event = lastEvent != null ? lastEvent : inputEventSupplier.supply();
// record the last event for retry when exception occurs
lastEvent = event;
if (event == null) {
- return;
+ return false;
}
try {
- switch (event.getType()) {
- case TABLET_INSERTION:
- pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
- break;
- case TSFILE_INSERTION:
- pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
- break;
- case DELETION:
- pipeProcessor.process((DeletionEvent) event, outputEventCollector);
- break;
- default:
- throw new UnsupportedOperationException(
- "Unsupported event type: " + event.getClass().getName());
+ if (event instanceof TabletInsertionEvent) {
+ pipeProcessor.process((TabletInsertionEvent) event, outputEventCollector);
+ } else if (event instanceof TsFileInsertionEvent) {
+ pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
+ } else {
+ pipeProcessor.process(event, outputEventCollector);
}
releaseLastEvent();
@@ -82,6 +74,8 @@ public class PipeProcessorSubtask extends PipeSubtask {
"Error occurred during executing PipeProcessor#process, perhaps need to check whether the implementation of PipeProcessor is correct according to the pipe-api description.",
e);
}
+
+ return true;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index c770b983b9a..5ddff7b2f29 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.pipe.api.event.Event;
import com.google.common.util.concurrent.FutureCallback;
@@ -47,6 +48,8 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
private final DecoratingLock callbackDecoratingLock = new DecoratingLock();
private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true);
+ private PipeSubtaskScheduler subtaskScheduler;
+
protected static final int MAX_RETRY_TIMES = 5;
private final AtomicInteger retryCount = new AtomicInteger(0);
protected Throwable lastFailedCause;
@@ -60,14 +63,25 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
public void bindExecutors(
ListeningExecutorService subtaskWorkerThreadPoolExecutor,
- ExecutorService subtaskCallbackListeningExecutor) {
+ ExecutorService subtaskCallbackListeningExecutor,
+ PipeSubtaskScheduler subtaskScheduler) {
this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
+ this.subtaskScheduler = subtaskScheduler;
}
@Override
public Void call() throws Exception {
- executeForAWhile();
+ // if the scheduler allows to schedule, then try to consume an event
+ while (subtaskScheduler.schedule()) {
+ // if the event is consumed successfully, then continue to consume the next event
+ // otherwise, stop consuming
+ if (!executeOnce()) {
+ break;
+ }
+ }
+ // reset the scheduler to make sure that the scheduler can schedule again
+ subtaskScheduler.reset();
// wait for the callable to be decorated by Futures.addCallback in the executorService
// to make sure that the callback can be submitted again on success or failure.
@@ -76,7 +90,13 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
return null;
}
- protected abstract void executeForAWhile() throws Exception;
+ /**
+ * try to consume an event by the pipe plugin.
+ *
+ * @return true if the event is consumed successfully, false if no more event can be consumed
+ * @throws Exception if any error occurs when consuming the event
+ */
+ protected abstract boolean executeOnce() throws Exception;
@Override
public void onSuccess(Void result) {
@@ -125,8 +145,12 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
shouldStopSubmittingSelf.set(false);
}
- public void disallowSubmittingSelf() {
- shouldStopSubmittingSelf.set(true);
+ /**
+ * @return true if the shouldStopSubmittingSelf state is changed from false to true, false
+ * otherwise
+ */
+ public boolean disallowSubmittingSelf() {
+ return !shouldStopSubmittingSelf.getAndSet(true);
}
public boolean isSubmittingSelf() {
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index a2c3508b294..7cd705af588 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDa
import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.EventType;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.junit.After;
@@ -163,9 +163,9 @@ public class PipeRealtimeCollectTest {
Arrays.asList(
listen(
collectors[0],
- type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+ event -> event instanceof TabletInsertionEvent ? 1 : 2,
writeNum << 1),
- listen(collectors[1], typ2 -> 1, writeNum));
+ listen(collectors[1], event -> 1, writeNum));
try {
listenFutures.get(0).get(10, TimeUnit.MINUTES);
@@ -199,14 +199,14 @@ public class PipeRealtimeCollectTest {
Arrays.asList(
listen(
collectors[0],
- type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+ event -> event instanceof TabletInsertionEvent ? 1 : 2,
writeNum << 1),
- listen(collectors[1], typ2 -> 1, writeNum),
+ listen(collectors[1], event -> 1, writeNum),
listen(
collectors[2],
- type -> type.equals(EventType.TABLET_INSERTION) ? 1 : 2,
+ event -> event instanceof TabletInsertionEvent ? 1 : 2,
writeNum << 1),
- listen(collectors[3], typ2 -> 1, writeNum));
+ listen(collectors[3], event -> 1, writeNum));
try {
listenFutures.get(0).get(10, TimeUnit.MINUTES);
listenFutures.get(1).get(10, TimeUnit.MINUTES);
@@ -278,9 +278,7 @@ public class PipeRealtimeCollectTest {
}
private Future<?> listen(
- PipeRealtimeDataRegionCollector collector,
- Function<EventType, Integer> weight,
- int expectNum) {
+ PipeRealtimeDataRegionCollector collector, Function<Event, Integer> weight, int expectNum) {
return listenerService.submit(
() -> {
int eventNum = 0;
@@ -293,7 +291,7 @@ public class PipeRealtimeCollectTest {
throw new RuntimeException(e);
}
if (event != null) {
- eventNum += weight.apply(event.getType());
+ eventNum += weight.apply(event);
}
}
} finally {