You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/27 07:19:06 UTC

[iotdb] branch rel/1.2 updated: [To rel/1.2] Pipe: Added tablet event count and tsfile event count to PipeHeartbeatEvent reports (#11225)(#11224)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new aa8e0d04b27 [To rel/1.2] Pipe: Added tablet event count and tsfile event count to PipeHeartbeatEvent reports (#11225)(#11224)
aa8e0d04b27 is described below

commit aa8e0d04b27aebe3e3ea0cb10c642da1fc13c34e
Author: Caideyipi <87...@users.noreply.github.com>
AuthorDate: Wed Sep 27 15:18:58 2023 +0800

    [To rel/1.2] Pipe: Added tablet event count and tsfile event count to PipeHeartbeatEvent reports (#11225)(#11224)
---
 .../event/common/heartbeat/PipeHeartbeatEvent.java | 49 ++++++++++-
 .../PipeRealtimeDataRegionHybridExtractor.java     |  2 +-
 .../pipe/task/connection/BlockingPendingQueue.java | 57 ++++++++++++-
 .../db/pipe/task/connection/EnrichedDeque.java     | 98 ++++++++++++++++++++++
 .../pipe/task/connection/PipeEventCollector.java   |  5 +-
 .../connection/UnboundedBlockingPendingQueue.java  | 60 -------------
 6 files changed, 201 insertions(+), 70 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index ef5a7657b03..fd6724088b0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.task.connection.EnrichedDeque;
 import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -32,8 +33,6 @@ import com.lmax.disruptor.RingBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Deque;
-
 public class PipeHeartbeatEvent extends EnrichedEvent {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class);
@@ -46,9 +45,20 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   private long timeProcessed;
   private long timeTransferred;
 
+  // Do not report disruptor tablet or tsFile size separately since
+  // The disruptor is usually nearly empty.
   private int disruptorSize;
+
+  private int extractorQueueTabletSize;
+  private int extractorQueueTsFileSize;
   private int extractorQueueSize;
+
+  private int bufferQueueTabletSize;
+  private int bufferQueueTsFileSize;
   private int bufferQueueSize;
+
+  private int connectorQueueTabletSize;
+  private int connectorQueueTsFileSize;
   private int connectorQueueSize;
 
   private final boolean shouldPrintMessage;
@@ -145,18 +155,24 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   public void recordExtractorQueueSize(UnboundedBlockingPendingQueue<Event> pendingQueue) {
     if (shouldPrintMessage) {
+      extractorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
+      extractorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
       extractorQueueSize = pendingQueue.size();
     }
   }
 
-  public void recordBufferQueueSize(Deque<Event> bufferQueue) {
+  public void recordBufferQueueSize(EnrichedDeque<Event> bufferQueue) {
     if (shouldPrintMessage) {
+      bufferQueueTabletSize = bufferQueue.getTabletInsertionEventCount();
+      bufferQueueTsFileSize = bufferQueue.getTsFileInsertionEventCount();
       bufferQueueSize = bufferQueue.size();
     }
   }
 
   public void recordConnectorQueueSize(BoundedBlockingPendingQueue<Event> pendingQueue) {
     if (shouldPrintMessage) {
+      connectorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
+      connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
       connectorQueueSize = pendingQueue.size();
     }
   }
@@ -175,10 +191,25 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
         timeTransferred != 0 ? (timeTransferred - timePublished) + "ms" : unknownMessage;
 
     final String disruptorSizeMessage = Integer.toString(disruptorSize);
+
+    final String extractorQueueTabletSizeMessage =
+        timeAssigned != 0 ? Integer.toString(extractorQueueTabletSize) : unknownMessage;
+    final String extractorQueueTsFileSizeMessage =
+        timeAssigned != 0 ? Integer.toString(extractorQueueTsFileSize) : unknownMessage;
     final String extractorQueueSizeMessage =
         timeAssigned != 0 ? Integer.toString(extractorQueueSize) : unknownMessage;
+
+    final String bufferQueueTabletSizeMessage =
+        timeProcessed != 0 ? Integer.toString(bufferQueueTabletSize) : unknownMessage;
+    final String bufferQueueTsFileSizeMessage =
+        timeProcessed != 0 ? Integer.toString(bufferQueueTsFileSize) : unknownMessage;
     final String bufferQueueSizeMessage =
         timeProcessed != 0 ? Integer.toString(bufferQueueSize) : unknownMessage;
+
+    final String connectorQueueTabletSizeMessage =
+        timeProcessed != 0 ? Integer.toString(connectorQueueTabletSize) : unknownMessage;
+    final String connectorQueueTsFileSizeMessage =
+        timeProcessed != 0 ? Integer.toString(connectorQueueTsFileSize) : unknownMessage;
     final String connectorQueueSizeMessage =
         timeProcessed != 0 ? Integer.toString(connectorQueueSize) : unknownMessage;
 
@@ -199,10 +230,22 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
         + totalTimeMessage
         + ", disruptorSize="
         + disruptorSizeMessage
+        + ", extractorQueueTabletSize="
+        + extractorQueueTabletSizeMessage
+        + ", extractorQueueTsFileSize="
+        + extractorQueueTsFileSizeMessage
         + ", extractorQueueSize="
         + extractorQueueSizeMessage
+        + ", bufferQueueTabletSize="
+        + bufferQueueTabletSizeMessage
+        + ", bufferQueueTsFileSize="
+        + bufferQueueTsFileSizeMessage
         + ", bufferQueueSize="
         + bufferQueueSizeMessage
+        + ", connectorQueueTabletSize="
+        + connectorQueueTabletSizeMessage
+        + ", connectorQueueTsFileSize="
+        + connectorQueueTsFileSizeMessage
         + ", connectorQueueSize="
         + connectorQueueSizeMessage
         + "}";
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 269e9ae1d6c..c6b66a22abb 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -203,7 +203,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
   }
 
   private boolean isTsFileEventCountInQueueExceededLimit() {
-    return pendingQueue.getTsfileInsertionEventCount()
+    return pendingQueue.getTsFileInsertionEventCount()
         >= PipeConfig.getInstance().getPipeExtractorPendingQueueTsFileLimit();
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
index 64b5220dffd..035d7ded94d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
@@ -21,12 +21,15 @@ package org.apache.iotdb.db.pipe.task.connection;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 public abstract class BlockingPendingQueue<E extends Event> {
@@ -36,15 +39,28 @@ public abstract class BlockingPendingQueue<E extends Event> {
   private static final long MAX_BLOCKING_TIME_MS =
       PipeConfig.getInstance().getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
 
+  private final AtomicInteger tabletInsertionEventCount;
+  private final AtomicInteger tsFileInsertionEventCount;
   protected final BlockingQueue<E> pendingQueue;
 
   protected BlockingPendingQueue(BlockingQueue<E> pendingQueue) {
     this.pendingQueue = pendingQueue;
+    tabletInsertionEventCount = new AtomicInteger(0);
+    tsFileInsertionEventCount = new AtomicInteger(0);
   }
 
   public boolean waitedOffer(E event) {
     try {
-      return pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
+      final boolean offered =
+          pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
+      if (offered) {
+        if (event instanceof TabletInsertionEvent) {
+          tabletInsertionEventCount.incrementAndGet();
+        } else if (event instanceof TsFileInsertionEvent) {
+          tsFileInsertionEventCount.incrementAndGet();
+        }
+      }
+      return offered;
     } catch (InterruptedException e) {
       LOGGER.info("pending queue offer is interrupted.", e);
       Thread.currentThread().interrupt();
@@ -53,12 +69,25 @@ public abstract class BlockingPendingQueue<E extends Event> {
   }
 
   public boolean directOffer(E event) {
-    return pendingQueue.offer(event);
+    final boolean offered = pendingQueue.offer(event);
+    if (offered) {
+      if (event instanceof TabletInsertionEvent) {
+        tabletInsertionEventCount.incrementAndGet();
+      } else if (event instanceof TsFileInsertionEvent) {
+        tsFileInsertionEventCount.incrementAndGet();
+      }
+    }
+    return offered;
   }
 
   public boolean put(E event) {
     try {
       pendingQueue.put(event);
+      if (event instanceof TabletInsertionEvent) {
+        tabletInsertionEventCount.incrementAndGet();
+      } else if (event instanceof TsFileInsertionEvent) {
+        tsFileInsertionEventCount.incrementAndGet();
+      }
       return true;
     } catch (InterruptedException e) {
       LOGGER.info("pending queue put is interrupted.", e);
@@ -68,13 +97,25 @@ public abstract class BlockingPendingQueue<E extends Event> {
   }
 
   public E directPoll() {
-    return pendingQueue.poll();
+    final E event = pendingQueue.poll();
+    if (event instanceof TabletInsertionEvent) {
+      tabletInsertionEventCount.decrementAndGet();
+    }
+    if (event instanceof TsFileInsertionEvent) {
+      tsFileInsertionEventCount.decrementAndGet();
+    }
+    return event;
   }
 
   public E waitedPoll() {
     E event = null;
     try {
       event = pendingQueue.poll(MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
+      if (event instanceof TabletInsertionEvent) {
+        tabletInsertionEventCount.decrementAndGet();
+      } else if (event instanceof TsFileInsertionEvent) {
+        tsFileInsertionEventCount.decrementAndGet();
+      }
     } catch (InterruptedException e) {
       LOGGER.info("pending queue poll is interrupted.", e);
       Thread.currentThread().interrupt();
@@ -84,6 +125,8 @@ public abstract class BlockingPendingQueue<E extends Event> {
 
   public void clear() {
     pendingQueue.clear();
+    tabletInsertionEventCount.set(0);
+    tsFileInsertionEventCount.set(0);
   }
 
   public void forEach(Consumer<? super E> action) {
@@ -93,4 +136,12 @@ public abstract class BlockingPendingQueue<E extends Event> {
   public int size() {
     return pendingQueue.size();
   }
+
+  public int getTabletInsertionEventCount() {
+    return tabletInsertionEventCount.get();
+  }
+
+  public int getTsFileInsertionEventCount() {
+    return tsFileInsertionEventCount.get();
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java
new file mode 100644
index 00000000000..a77a87c712f
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.connection;
+
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+import java.util.Deque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+public class EnrichedDeque<E extends Event> {
+
+  private final AtomicInteger tabletInsertionEventCount;
+  private final AtomicInteger tsFileInsertionEventCount;
+  protected final Deque<E> deque;
+
+  protected EnrichedDeque(Deque<E> deque) {
+    this.deque = deque;
+    tabletInsertionEventCount = new AtomicInteger(0);
+    tsFileInsertionEventCount = new AtomicInteger(0);
+  }
+
+  public boolean offer(E event) {
+    final boolean offered = deque.offer(event);
+    if (offered) {
+      if (event instanceof TabletInsertionEvent) {
+        tabletInsertionEventCount.incrementAndGet();
+      } else if (event instanceof TsFileInsertionEvent) {
+        tsFileInsertionEventCount.incrementAndGet();
+      }
+    }
+    return offered;
+  }
+
+  public E poll() {
+    final E event = deque.poll();
+    if (event instanceof TabletInsertionEvent) {
+      tabletInsertionEventCount.decrementAndGet();
+    }
+    if (event instanceof TsFileInsertionEvent) {
+      tsFileInsertionEventCount.decrementAndGet();
+    }
+    return event;
+  }
+
+  public void clear() {
+    deque.clear();
+    tabletInsertionEventCount.set(0);
+    tsFileInsertionEventCount.set(0);
+  }
+
+  public int size() {
+    return deque.size();
+  }
+
+  public void forEach(Consumer<? super E> action) {
+    deque.forEach(action);
+  }
+
+  public E peek() {
+    return deque.peek();
+  }
+
+  public E peekLast() {
+    return deque.peekLast();
+  }
+
+  public boolean isEmpty() {
+    return deque.isEmpty();
+  }
+
+  public int getTabletInsertionEventCount() {
+    return tabletInsertionEventCount.get();
+  }
+
+  public int getTsFileInsertionEventCount() {
+    return tsFileInsertionEventCount.get();
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index a1e5fab5fd6..c4368c7eda4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -24,18 +24,17 @@ import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 
-import java.util.Deque;
 import java.util.LinkedList;
 
 public class PipeEventCollector implements EventCollector, AutoCloseable {
 
   private final BoundedBlockingPendingQueue<Event> pendingQueue;
 
-  private final Deque<Event> bufferQueue;
+  private final EnrichedDeque<Event> bufferQueue;
 
   public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
     this.pendingQueue = pendingQueue;
-    bufferQueue = new LinkedList<>();
+    bufferQueue = new EnrichedDeque<>(new LinkedList<>());
   }
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
index 7a1dce27548..343621bbb4a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
@@ -20,80 +20,20 @@
 package org.apache.iotdb.db.pipe.task.connection;
 
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class UnboundedBlockingPendingQueue<E extends Event> extends BlockingPendingQueue<E> {
 
   private final BlockingDeque<E> pendingDeque;
 
-  private final AtomicInteger tsfileInsertionEventCount;
-
   public UnboundedBlockingPendingQueue() {
     super(new LinkedBlockingDeque<>());
     pendingDeque = (BlockingDeque<E>) pendingQueue;
-    tsfileInsertionEventCount = new AtomicInteger(0);
-  }
-
-  @Override
-  public boolean waitedOffer(E event) {
-    final boolean offered = super.waitedOffer(event);
-    if (offered && event instanceof TsFileInsertionEvent) {
-      tsfileInsertionEventCount.incrementAndGet();
-    }
-    return offered;
-  }
-
-  @Override
-  public boolean directOffer(E event) {
-    final boolean offered = super.directOffer(event);
-    if (offered && event instanceof TsFileInsertionEvent) {
-      tsfileInsertionEventCount.incrementAndGet();
-    }
-    return offered;
-  }
-
-  @Override
-  public boolean put(E event) {
-    final boolean putSuccessfully = super.put(event);
-    if (putSuccessfully && event instanceof TsFileInsertionEvent) {
-      tsfileInsertionEventCount.incrementAndGet();
-    }
-    return putSuccessfully;
-  }
-
-  @Override
-  public E directPoll() {
-    final E event = super.directPoll();
-    if (event instanceof TsFileInsertionEvent) {
-      tsfileInsertionEventCount.decrementAndGet();
-    }
-    return event;
-  }
-
-  @Override
-  public E waitedPoll() {
-    final E event = super.waitedPoll();
-    if (event instanceof TsFileInsertionEvent) {
-      tsfileInsertionEventCount.decrementAndGet();
-    }
-    return event;
-  }
-
-  @Override
-  public void clear() {
-    super.clear();
-    tsfileInsertionEventCount.set(0);
   }
 
   public E peekLast() {
     return pendingDeque.peekLast();
   }
-
-  public int getTsfileInsertionEventCount() {
-    return tsfileInsertionEventCount.get();
-  }
 }