You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/27 07:19:06 UTC
[iotdb] branch rel/1.2 updated: [To rel/1.2] Pipe: Added tablet event count and tsfile event count to PipeHeartbeatEvent reports (#11225)(#11224)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new aa8e0d04b27 [To rel/1.2] Pipe: Added tablet event count and tsfile event count to PipeHeartbeatEvent reports (#11225)(#11224)
aa8e0d04b27 is described below
commit aa8e0d04b27aebe3e3ea0cb10c642da1fc13c34e
Author: Caideyipi <87...@users.noreply.github.com>
AuthorDate: Wed Sep 27 15:18:58 2023 +0800
[To rel/1.2] Pipe: Added tablet event count and tsfile event count to PipeHeartbeatEvent reports (#11225)(#11224)
---
.../event/common/heartbeat/PipeHeartbeatEvent.java | 49 ++++++++++-
.../PipeRealtimeDataRegionHybridExtractor.java | 2 +-
.../pipe/task/connection/BlockingPendingQueue.java | 57 ++++++++++++-
.../db/pipe/task/connection/EnrichedDeque.java | 98 ++++++++++++++++++++++
.../pipe/task/connection/PipeEventCollector.java | 5 +-
.../connection/UnboundedBlockingPendingQueue.java | 60 -------------
6 files changed, 201 insertions(+), 70 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index ef5a7657b03..fd6724088b0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.connection.EnrichedDeque;
import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.event.Event;
@@ -32,8 +33,6 @@ import com.lmax.disruptor.RingBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Deque;
-
public class PipeHeartbeatEvent extends EnrichedEvent {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class);
@@ -46,9 +45,20 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private long timeProcessed;
private long timeTransferred;
+ // Do not report disruptor tablet or tsFile size separately since
+ // The disruptor is usually nearly empty.
private int disruptorSize;
+
+ private int extractorQueueTabletSize;
+ private int extractorQueueTsFileSize;
private int extractorQueueSize;
+
+ private int bufferQueueTabletSize;
+ private int bufferQueueTsFileSize;
private int bufferQueueSize;
+
+ private int connectorQueueTabletSize;
+ private int connectorQueueTsFileSize;
private int connectorQueueSize;
private final boolean shouldPrintMessage;
@@ -145,18 +155,24 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
public void recordExtractorQueueSize(UnboundedBlockingPendingQueue<Event> pendingQueue) {
if (shouldPrintMessage) {
+ extractorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
+ extractorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
extractorQueueSize = pendingQueue.size();
}
}
- public void recordBufferQueueSize(Deque<Event> bufferQueue) {
+ public void recordBufferQueueSize(EnrichedDeque<Event> bufferQueue) {
if (shouldPrintMessage) {
+ bufferQueueTabletSize = bufferQueue.getTabletInsertionEventCount();
+ bufferQueueTsFileSize = bufferQueue.getTsFileInsertionEventCount();
bufferQueueSize = bufferQueue.size();
}
}
public void recordConnectorQueueSize(BoundedBlockingPendingQueue<Event> pendingQueue) {
if (shouldPrintMessage) {
+ connectorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
+ connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
connectorQueueSize = pendingQueue.size();
}
}
@@ -175,10 +191,25 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
timeTransferred != 0 ? (timeTransferred - timePublished) + "ms" : unknownMessage;
final String disruptorSizeMessage = Integer.toString(disruptorSize);
+
+ final String extractorQueueTabletSizeMessage =
+ timeAssigned != 0 ? Integer.toString(extractorQueueTabletSize) : unknownMessage;
+ final String extractorQueueTsFileSizeMessage =
+ timeAssigned != 0 ? Integer.toString(extractorQueueTsFileSize) : unknownMessage;
final String extractorQueueSizeMessage =
timeAssigned != 0 ? Integer.toString(extractorQueueSize) : unknownMessage;
+
+ final String bufferQueueTabletSizeMessage =
+ timeProcessed != 0 ? Integer.toString(bufferQueueTabletSize) : unknownMessage;
+ final String bufferQueueTsFileSizeMessage =
+ timeProcessed != 0 ? Integer.toString(bufferQueueTsFileSize) : unknownMessage;
final String bufferQueueSizeMessage =
timeProcessed != 0 ? Integer.toString(bufferQueueSize) : unknownMessage;
+
+ final String connectorQueueTabletSizeMessage =
+ timeProcessed != 0 ? Integer.toString(connectorQueueTabletSize) : unknownMessage;
+ final String connectorQueueTsFileSizeMessage =
+ timeProcessed != 0 ? Integer.toString(connectorQueueTsFileSize) : unknownMessage;
final String connectorQueueSizeMessage =
timeProcessed != 0 ? Integer.toString(connectorQueueSize) : unknownMessage;
@@ -199,10 +230,22 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
+ totalTimeMessage
+ ", disruptorSize="
+ disruptorSizeMessage
+ + ", extractorQueueTabletSize="
+ + extractorQueueTabletSizeMessage
+ + ", extractorQueueTsFileSize="
+ + extractorQueueTsFileSizeMessage
+ ", extractorQueueSize="
+ extractorQueueSizeMessage
+ + ", bufferQueueTabletSize="
+ + bufferQueueTabletSizeMessage
+ + ", bufferQueueTsFileSize="
+ + bufferQueueTsFileSizeMessage
+ ", bufferQueueSize="
+ bufferQueueSizeMessage
+ + ", connectorQueueTabletSize="
+ + connectorQueueTabletSizeMessage
+ + ", connectorQueueTsFileSize="
+ + connectorQueueTsFileSizeMessage
+ ", connectorQueueSize="
+ connectorQueueSizeMessage
+ "}";
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 269e9ae1d6c..c6b66a22abb 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -203,7 +203,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
}
private boolean isTsFileEventCountInQueueExceededLimit() {
- return pendingQueue.getTsfileInsertionEventCount()
+ return pendingQueue.getTsFileInsertionEventCount()
>= PipeConfig.getInstance().getPipeExtractorPendingQueueTsFileLimit();
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
index 64b5220dffd..035d7ded94d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
@@ -21,12 +21,15 @@ package org.apache.iotdb.db.pipe.task.connection;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
public abstract class BlockingPendingQueue<E extends Event> {
@@ -36,15 +39,28 @@ public abstract class BlockingPendingQueue<E extends Event> {
private static final long MAX_BLOCKING_TIME_MS =
PipeConfig.getInstance().getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
+ private final AtomicInteger tabletInsertionEventCount;
+ private final AtomicInteger tsFileInsertionEventCount;
protected final BlockingQueue<E> pendingQueue;
protected BlockingPendingQueue(BlockingQueue<E> pendingQueue) {
this.pendingQueue = pendingQueue;
+ tabletInsertionEventCount = new AtomicInteger(0);
+ tsFileInsertionEventCount = new AtomicInteger(0);
}
public boolean waitedOffer(E event) {
try {
- return pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
+ final boolean offered =
+ pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
+ if (offered) {
+ if (event instanceof TabletInsertionEvent) {
+ tabletInsertionEventCount.incrementAndGet();
+ } else if (event instanceof TsFileInsertionEvent) {
+ tsFileInsertionEventCount.incrementAndGet();
+ }
+ }
+ return offered;
} catch (InterruptedException e) {
LOGGER.info("pending queue offer is interrupted.", e);
Thread.currentThread().interrupt();
@@ -53,12 +69,25 @@ public abstract class BlockingPendingQueue<E extends Event> {
}
public boolean directOffer(E event) {
- return pendingQueue.offer(event);
+ final boolean offered = pendingQueue.offer(event);
+ if (offered) {
+ if (event instanceof TabletInsertionEvent) {
+ tabletInsertionEventCount.incrementAndGet();
+ } else if (event instanceof TsFileInsertionEvent) {
+ tsFileInsertionEventCount.incrementAndGet();
+ }
+ }
+ return offered;
}
public boolean put(E event) {
try {
pendingQueue.put(event);
+ if (event instanceof TabletInsertionEvent) {
+ tabletInsertionEventCount.incrementAndGet();
+ } else if (event instanceof TsFileInsertionEvent) {
+ tsFileInsertionEventCount.incrementAndGet();
+ }
return true;
} catch (InterruptedException e) {
LOGGER.info("pending queue put is interrupted.", e);
@@ -68,13 +97,25 @@ public abstract class BlockingPendingQueue<E extends Event> {
}
public E directPoll() {
- return pendingQueue.poll();
+ final E event = pendingQueue.poll();
+ if (event instanceof TabletInsertionEvent) {
+ tabletInsertionEventCount.decrementAndGet();
+ }
+ if (event instanceof TsFileInsertionEvent) {
+ tsFileInsertionEventCount.decrementAndGet();
+ }
+ return event;
}
public E waitedPoll() {
E event = null;
try {
event = pendingQueue.poll(MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
+ if (event instanceof TabletInsertionEvent) {
+ tabletInsertionEventCount.decrementAndGet();
+ } else if (event instanceof TsFileInsertionEvent) {
+ tsFileInsertionEventCount.decrementAndGet();
+ }
} catch (InterruptedException e) {
LOGGER.info("pending queue poll is interrupted.", e);
Thread.currentThread().interrupt();
@@ -84,6 +125,8 @@ public abstract class BlockingPendingQueue<E extends Event> {
public void clear() {
pendingQueue.clear();
+ tabletInsertionEventCount.set(0);
+ tsFileInsertionEventCount.set(0);
}
public void forEach(Consumer<? super E> action) {
@@ -93,4 +136,12 @@ public abstract class BlockingPendingQueue<E extends Event> {
public int size() {
return pendingQueue.size();
}
+
+ public int getTabletInsertionEventCount() {
+ return tabletInsertionEventCount.get();
+ }
+
+ public int getTsFileInsertionEventCount() {
+ return tsFileInsertionEventCount.get();
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java
new file mode 100644
index 00000000000..a77a87c712f
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.connection;
+
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+import java.util.Deque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+public class EnrichedDeque<E extends Event> {
+
+ private final AtomicInteger tabletInsertionEventCount;
+ private final AtomicInteger tsFileInsertionEventCount;
+ protected final Deque<E> deque;
+
+ protected EnrichedDeque(Deque<E> deque) {
+ this.deque = deque;
+ tabletInsertionEventCount = new AtomicInteger(0);
+ tsFileInsertionEventCount = new AtomicInteger(0);
+ }
+
+ public boolean offer(E event) {
+ final boolean offered = deque.offer(event);
+ if (offered) {
+ if (event instanceof TabletInsertionEvent) {
+ tabletInsertionEventCount.incrementAndGet();
+ } else if (event instanceof TsFileInsertionEvent) {
+ tsFileInsertionEventCount.incrementAndGet();
+ }
+ }
+ return offered;
+ }
+
+ public E poll() {
+ final E event = deque.poll();
+ if (event instanceof TabletInsertionEvent) {
+ tabletInsertionEventCount.decrementAndGet();
+ }
+ if (event instanceof TsFileInsertionEvent) {
+ tsFileInsertionEventCount.decrementAndGet();
+ }
+ return event;
+ }
+
+ public void clear() {
+ deque.clear();
+ tabletInsertionEventCount.set(0);
+ tsFileInsertionEventCount.set(0);
+ }
+
+ public int size() {
+ return deque.size();
+ }
+
+ public void forEach(Consumer<? super E> action) {
+ deque.forEach(action);
+ }
+
+ public E peek() {
+ return deque.peek();
+ }
+
+ public E peekLast() {
+ return deque.peekLast();
+ }
+
+ public boolean isEmpty() {
+ return deque.isEmpty();
+ }
+
+ public int getTabletInsertionEventCount() {
+ return tabletInsertionEventCount.get();
+ }
+
+ public int getTsFileInsertionEventCount() {
+ return tsFileInsertionEventCount.get();
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index a1e5fab5fd6..c4368c7eda4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -24,18 +24,17 @@ import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
-import java.util.Deque;
import java.util.LinkedList;
public class PipeEventCollector implements EventCollector, AutoCloseable {
private final BoundedBlockingPendingQueue<Event> pendingQueue;
- private final Deque<Event> bufferQueue;
+ private final EnrichedDeque<Event> bufferQueue;
public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
this.pendingQueue = pendingQueue;
- bufferQueue = new LinkedList<>();
+ bufferQueue = new EnrichedDeque<>(new LinkedList<>());
}
@Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
index 7a1dce27548..343621bbb4a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
@@ -20,80 +20,20 @@
package org.apache.iotdb.db.pipe.task.connection;
import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicInteger;
public class UnboundedBlockingPendingQueue<E extends Event> extends BlockingPendingQueue<E> {
private final BlockingDeque<E> pendingDeque;
- private final AtomicInteger tsfileInsertionEventCount;
-
public UnboundedBlockingPendingQueue() {
super(new LinkedBlockingDeque<>());
pendingDeque = (BlockingDeque<E>) pendingQueue;
- tsfileInsertionEventCount = new AtomicInteger(0);
- }
-
- @Override
- public boolean waitedOffer(E event) {
- final boolean offered = super.waitedOffer(event);
- if (offered && event instanceof TsFileInsertionEvent) {
- tsfileInsertionEventCount.incrementAndGet();
- }
- return offered;
- }
-
- @Override
- public boolean directOffer(E event) {
- final boolean offered = super.directOffer(event);
- if (offered && event instanceof TsFileInsertionEvent) {
- tsfileInsertionEventCount.incrementAndGet();
- }
- return offered;
- }
-
- @Override
- public boolean put(E event) {
- final boolean putSuccessfully = super.put(event);
- if (putSuccessfully && event instanceof TsFileInsertionEvent) {
- tsfileInsertionEventCount.incrementAndGet();
- }
- return putSuccessfully;
- }
-
- @Override
- public E directPoll() {
- final E event = super.directPoll();
- if (event instanceof TsFileInsertionEvent) {
- tsfileInsertionEventCount.decrementAndGet();
- }
- return event;
- }
-
- @Override
- public E waitedPoll() {
- final E event = super.waitedPoll();
- if (event instanceof TsFileInsertionEvent) {
- tsfileInsertionEventCount.decrementAndGet();
- }
- return event;
- }
-
- @Override
- public void clear() {
- super.clear();
- tsfileInsertionEventCount.set(0);
}
public E peekLast() {
return pendingDeque.peekLast();
}
-
- public int getTsfileInsertionEventCount() {
- return tsfileInsertionEventCount.get();
- }
}