You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/08/05 05:51:19 UTC
[iotdb] branch master updated: [IOTDB-6100] Pipe: Fix running in hybrid mode will cause wal cannot be deleted & some pipe data lost due to wrong ProducerType of Disruptor (#10790)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3def1f80976 [IOTDB-6100] Pipe: Fix running in hybrid mode will cause wal cannot be deleted & some pipe data lost due to wrong ProducerType of Disruptor (#10790)
3def1f80976 is described below
commit 3def1f809768073cdfa9578bc1abd4fead569ac8
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sat Aug 5 13:51:14 2023 +0800
[IOTDB-6100] Pipe: Fix running in hybrid mode will cause wal cannot be deleted & some pipe data lost due to wrong ProducerType of Disruptor (#10790)
* Fix: Running in hybrid mode will cause wal cannot be deleted
* Fix: some pipe data is lost due to wrong ProducerType of Disruptor
---
.../PipeRealtimeDataRegionHybridExtractor.java | 90 ++++++++++++++++------
.../PipeRealtimeDataRegionLogExtractor.java | 22 ++++--
.../PipeRealtimeDataRegionTsFileExtractor.java | 24 +++---
.../realtime/assigner/DisruptorQueue.java | 85 ++++++++------------
.../realtime/assigner/PipeDataRegionAssigner.java | 10 +--
.../db/pipe/resource/wal/PipeWALResource.java | 4 +
.../pipe/resource/wal/PipeWALResourceManager.java | 12 ++-
.../iotdb/commons/concurrent/ThreadName.java | 2 +
8 files changed, 147 insertions(+), 102 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index ffefb13daf0..9b8ff792be4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -73,23 +73,41 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
private void extractTabletInsertion(PipeRealtimeEvent event) {
if (isApproachingCapacity()) {
- event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
// if the pending queue is approaching capacity, we should not extract any more tablet events.
// all the data represented by the tablet events should be carried by the following tsfile
// event.
- return;
+ event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
}
- if (!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)
- && !pendingQueue.waitedOffer(event)) {
- LOGGER.warn(
- "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
- + "has reached capacity, discard tablet event {}, current state {}",
- this,
- event,
- event.getTsFileEpoch().getState(this));
- // this would not happen, but just in case.
- // UnboundedBlockingPendingQueue is unbounded, so it should never reach capacity.
+ final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+ switch (state) {
+ case USING_TSFILE:
+ // Ignore the tablet event.
+ event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+ break;
+ case EMPTY:
+ case USING_TABLET:
+ if (!pendingQueue.waitedOffer(event)) {
+ // this would not happen, but just in case.
+ // pendingQueue is unbounded, so it should never reach capacity.
+ final String errorMessage =
+ String.format(
+ "extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s "
+ + "has reached capacity, discard tablet event %s, current state %s",
+ this, event, event.getTsFileEpoch().getState(this));
+ LOGGER.error(errorMessage);
+ PipeAgent.runtime()
+ .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
+
+ // Ignore the tablet event.
+ event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported state %s for hybrid realtime extractor %s",
+ state, PipeRealtimeDataRegionHybridExtractor.class.getName()));
}
}
@@ -101,15 +119,35 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
state ->
state.equals(TsFileEpoch.State.EMPTY) ? TsFileEpoch.State.USING_TSFILE : state);
- if (!pendingQueue.waitedOffer(event)) {
- LOGGER.warn(
- "extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
- + "has reached capacity, discard TsFile event {}, current state {}",
- this,
- event,
- event.getTsFileEpoch().getState(this));
- // this would not happen, but just in case.
- // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+ final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
+ switch (state) {
+ case EMPTY:
+ case USING_TSFILE:
+ if (!pendingQueue.waitedOffer(event)) {
+ // this would not happen, but just in case.
+ // pendingQueue is unbounded, so it should never reach capacity.
+ final String errorMessage =
+ String.format(
+ "extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s "
+ + "has reached capacity, discard TsFile event %s, current state %s",
+ this, event, event.getTsFileEpoch().getState(this));
+ LOGGER.error(errorMessage);
+ PipeAgent.runtime()
+ .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
+
+ // Ignore the tsfile event.
+ event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+ }
+ break;
+ case USING_TABLET:
+ // All the tablet events have been extracted, so we can ignore the tsfile event.
+ event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unsupported state %s for hybrid realtime extractor %s",
+ state, PipeRealtimeDataRegionHybridExtractor.class.getName()));
}
}
@@ -139,6 +177,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
}
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
+
if (suppliedEvent != null) {
return suppliedEvent;
}
@@ -166,7 +205,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
// this event is not reliable anymore. but the data represented by this event
// has been carried by the following tsfile event, so we can just discard this event.
event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
- LOGGER.warn("Increase reference count for event {} error.", event);
+ LOGGER.warn(
+ "Discard tablet event {} because it is not reliable anymore. "
+ + "Change the state of TsFileEpoch to USING_TSFILE.",
+ event);
return null;
}
}
@@ -182,7 +224,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
state -> {
// this would not happen, but just in case.
if (state.equals(TsFileEpoch.State.EMPTY)) {
- LOGGER.warn(
+ LOGGER.error(
String.format("EMPTY TsFileEpoch when supplying TsFile Event %s", event));
return TsFileEpoch.State.USING_TSFILE;
}
@@ -202,7 +244,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
+ "the reference count can not be increased, "
+ "the data represented by this event is lost",
event.getEvent());
- LOGGER.warn(errorMessage);
+ LOGGER.error(errorMessage);
PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
return null;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
index 74ff0c533b1..156c8891710 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java
@@ -48,18 +48,23 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TABLET);
if (!(event.getEvent() instanceof TabletInsertionEvent)) {
+ event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
return;
}
if (!pendingQueue.waitedOffer(event)) {
- LOGGER.warn(
- "extract: pending queue of PipeRealtimeDataRegionLogExtractor {} "
- + "has reached capacity, discard tablet event {}, current state {}",
- this,
- event,
- event.getTsFileEpoch().getState(this));
// this would not happen, but just in case.
- // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+ // pendingQueue is unbounded, so it should never reach capacity.
+ final String errorMessage =
+ String.format(
+ "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s "
+ + "has reached capacity, discard tablet event %s, current state %s",
+ this, event, event.getTsFileEpoch().getState(this));
+ LOGGER.error(errorMessage);
+ PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
+
+ // ignore this event.
+ event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
}
}
@@ -93,11 +98,12 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
+ "the reference count can not be increased, "
+ "the data represented by this event is lost",
realtimeEvent.getEvent());
- LOGGER.warn(errorMessage);
+ LOGGER.error(errorMessage);
PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
}
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
+
if (suppliedEvent != null) {
return suppliedEvent;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
index bab1ea2ec46..cae2e6f3c1b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java
@@ -48,18 +48,23 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
+ event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
return;
}
if (!pendingQueue.waitedOffer(event)) {
- LOGGER.warn(
- "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor {} "
- + "has reached capacity, discard TsFile event {}, current state {}",
- this,
- event,
- event.getTsFileEpoch().getState(this));
- // this would not happen, but just in case.
- // ListenableUnblockingPendingQueue is unbounded, so it should never reach capacity.
+ // This would not happen, but just in case.
+ // Pending is unbounded, so it should never reach capacity.
+ final String errorMessage =
+ String.format(
+ "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor %s "
+ + "has reached capacity, discard TsFile event %s, current state %s",
+ this, event, event.getTsFileEpoch().getState(this));
+ LOGGER.error(errorMessage);
+ PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
+
+ // Ignore the event.
+ event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
}
}
@@ -93,11 +98,12 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
+ "the reference count can not be increased, "
+ "the data represented by this event is lost",
realtimeEvent.getEvent());
- LOGGER.warn(errorMessage);
+ LOGGER.error(errorMessage);
PipeAgent.runtime().report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
}
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
+
if (suppliedEvent != null) {
return suppliedEvent;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
index 9c6d97901aa..b7823171da7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
@@ -19,81 +19,62 @@
package org.apache.iotdb.db.pipe.extractor.realtime.assigner;
+import org.apache.iotdb.commons.concurrent.IoTDBDaemonThreadFactory;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
-import com.lmax.disruptor.util.DaemonThreadFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ThreadFactory;
+import static org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR;
-public class DisruptorQueue<E> {
+public class DisruptorQueue {
- private Disruptor<Container<E>> disruptor;
- private RingBuffer<Container<E>> ringBuffer;
+ private static final IoTDBDaemonThreadFactory THREAD_FACTORY =
+ new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName());
- private DisruptorQueue() {}
+ private final Disruptor<EventContainer> disruptor;
+ private final RingBuffer<EventContainer> ringBuffer;
- public void publish(E obj) {
- ringBuffer.publishEvent((container, sequence, o) -> container.setObj(o), obj);
+ public DisruptorQueue(EventHandler<PipeRealtimeEvent> eventHandler) {
+ disruptor =
+ new Disruptor<>(
+ EventContainer::new,
+ PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize(),
+ THREAD_FACTORY,
+ ProducerType.MULTI,
+ new BlockingWaitStrategy());
+ disruptor.handleEventsWith(
+ (container, sequence, endOfBatch) ->
+ eventHandler.onEvent(container.getEvent(), sequence, endOfBatch));
+ disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler());
+
+ ringBuffer = disruptor.start();
+ }
+
+ public void publish(PipeRealtimeEvent event) {
+ ringBuffer.publishEvent((container, sequence, o) -> container.setEvent(event), event);
}
public void clear() {
disruptor.halt();
}
- public static class Builder<E> {
- private int ringBufferSize =
- PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize();
- private ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
- private ProducerType producerType = ProducerType.MULTI;
- private WaitStrategy waitStrategy = new BlockingWaitStrategy();
- private final List<EventHandler<E>> handlers = new ArrayList<>();
-
- public Builder<E> setProducerType(ProducerType producerType) {
- this.producerType = producerType;
- return this;
- }
-
- public Builder<E> addEventHandler(EventHandler<E> eventHandler) {
- this.handlers.add(eventHandler);
- return this;
- }
-
- public DisruptorQueue<E> build() {
- DisruptorQueue<E> disruptorQueue = new DisruptorQueue<>();
- disruptorQueue.disruptor =
- new Disruptor<>(
- Container::new, ringBufferSize, threadFactory, producerType, waitStrategy);
- for (EventHandler<E> handler : handlers) {
- disruptorQueue.disruptor.handleEventsWith(
- (container, sequence, endOfBatch) ->
- handler.onEvent(container.getObj(), sequence, endOfBatch));
- }
- disruptorQueue.disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler());
- disruptorQueue.disruptor.start();
- disruptorQueue.ringBuffer = disruptorQueue.disruptor.getRingBuffer();
- return disruptorQueue;
- }
- }
+ private static class EventContainer {
- private static class Container<E> {
- private E obj;
+ private PipeRealtimeEvent event;
- private Container() {}
+ private EventContainer() {}
- public E getObj() {
- return obj;
+ public PipeRealtimeEvent getEvent() {
+ return event;
}
- public void setObj(E obj) {
- this.obj = obj;
+ public void setEvent(PipeRealtimeEvent event) {
+ this.event = event;
}
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
index d5ca1a10b90..ee569e9e4ff 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
@@ -24,23 +24,17 @@ import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtract
import org.apache.iotdb.db.pipe.extractor.realtime.matcher.CachedSchemaPatternMatcher;
import org.apache.iotdb.db.pipe.extractor.realtime.matcher.PipeDataRegionMatcher;
-import com.lmax.disruptor.dsl.ProducerType;
-
public class PipeDataRegionAssigner {
/** The matcher is used to match the event with the extractor based on the pattern. */
private final PipeDataRegionMatcher matcher;
/** The disruptor is used to assign the event to the extractor. */
- private final DisruptorQueue<PipeRealtimeEvent> disruptor;
+ private final DisruptorQueue disruptor;
public PipeDataRegionAssigner() {
this.matcher = new CachedSchemaPatternMatcher();
- this.disruptor =
- new DisruptorQueue.Builder<PipeRealtimeEvent>()
- .setProducerType(ProducerType.SINGLE)
- .addEventHandler(this::assignToExtractor)
- .build();
+ this.disruptor = new DisruptorQueue(this::assignToExtractor);
}
public void publishToAssign(PipeRealtimeEvent event) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 085c71d5f9c..358e319dafa 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -162,4 +162,8 @@ public abstract class PipeWALResource implements Closeable {
referenceCount.set(0);
}
+
+ public int getReferenceCount() {
+ return referenceCount.get();
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 64cd83f1a67..afbe6931533 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -24,6 +24,9 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
@@ -34,6 +37,8 @@ import java.util.concurrent.locks.ReentrantLock;
public abstract class PipeWALResourceManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeWALResourceManager.class);
+
protected final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
private static final int SEGMENT_LOCK_COUNT = 32;
@@ -55,7 +60,7 @@ public abstract class PipeWALResourceManager {
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
PIPE_WAL_RESOURCE_TTL_CHECKER,
() -> {
- Iterator<Map.Entry<Long, PipeWALResource>> iterator =
+ final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
memtableIdToPipeWALResourceMap.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<Long, PipeWALResource> entry = iterator.next();
@@ -66,6 +71,11 @@ public abstract class PipeWALResourceManager {
try {
if (entry.getValue().invalidateIfPossible()) {
iterator.remove();
+ } else {
+ LOGGER.info(
+ "WAL (memtableId {}) is still referenced {} times",
+ entry.getKey(),
+ entry.getValue().getReferenceCount());
}
} finally {
lock.unlock();
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index aff32b22695..0382f963a42 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -122,6 +122,7 @@ public enum ThreadName {
GPRC_DEFAULT_WORKER_ELG("grpc-default-worker-ELG"),
GROUP_MANAGEMENT("groupManagement"),
// -------------------------- Compute --------------------------
+ PIPE_EXTRACTOR_DISRUPTOR("Pipe-Extractor-Disruptor"),
PIPE_ASSIGNER_EXECUTOR_POOL("Pipe-Assigner-Executor-Pool"),
PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
@@ -257,6 +258,7 @@ public enum ThreadName {
private static final Set<ThreadName> computeThreadNames =
new HashSet<>(
Arrays.asList(
+ PIPE_EXTRACTOR_DISRUPTOR,
PIPE_ASSIGNER_EXECUTOR_POOL,
PIPE_PROCESSOR_EXECUTOR_POOL,
PIPE_CONNECTOR_EXECUTOR_POOL,