You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/25 12:18:49 UTC

[iotdb] 01/05: reduce heartbeat frequency

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-wal-resource-management
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 31fa754dcb66f32edc9a23e2bb4b4c1d7888d1eb
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue May 23 17:24:02 2023 +0800

    reduce heartbeat frequency
---
 .../apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java  | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 742b2230fa6..df0ea7d1fcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -39,6 +39,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
   private final ListenableBlockingPendingQueue<Event> inputPendingQueue;
   private final PipeConnector outputPipeConnector;
 
+  private static final int HEARTBEAT_CHECK_INTERVAL = 1000;
+  private int executeOnceInvokedTimes;
+
   /** @param taskID connectorAttributeSortedString */
   public PipeConnectorSubtask(
       String taskID,
@@ -47,13 +50,15 @@ public class PipeConnectorSubtask extends PipeSubtask {
     super(taskID);
     this.inputPendingQueue = inputPendingQueue;
     this.outputPipeConnector = outputPipeConnector;
+    executeOnceInvokedTimes = 0;
   }
 
   @Override
   protected synchronized boolean executeOnce() {
     try {
-      // TODO: reduce the frequency of heartbeat
-      outputPipeConnector.heartbeat();
+      if (executeOnceInvokedTimes++ % HEARTBEAT_CHECK_INTERVAL == 0) {
+        outputPipeConnector.heartbeat();
+      }
     } catch (Exception e) {
       throw new PipeConnectionException(
           "PipeConnector: failed to connect to the target system.", e);