You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/28 09:55:48 UTC

[iotdb] 01/05: new config prop: pipeSubtaskCronEventInjectorExecutionIntervalSeconds

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-cause-wal-pin
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6f7d78721cc70124b503961bc483f9f6fd409bae
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Sep 28 12:23:24 2023 +0800

    new config prop: pipeSubtaskCronEventInjectorExecutionIntervalSeconds
---
 .../iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java    |  4 +++-
 .../main/java/org/apache/iotdb/commons/conf/CommonConfig.java | 11 +++++++++++
 .../java/org/apache/iotdb/commons/conf/CommonDescriptor.java  |  5 +++++
 .../java/org/apache/iotdb/commons/pipe/config/PipeConfig.java |  4 ++++
 4 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
index 8ec9798e187..20d8028855c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.runtime;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 
 import org.slf4j.Logger;
@@ -35,7 +36,8 @@ public class PipeCronEventInjector {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeCronEventInjector.class);
 
-  private static final int CRON_EVENT_INJECTOR_INTERVAL_SECONDS = 1;
+  private static final int CRON_EVENT_INJECTOR_INTERVAL_SECONDS =
+      PipeConfig.getInstance().getPipeSubtaskCronEventInjectorExecutionIntervalSeconds();
 
   private static final ScheduledExecutorService CRON_EVENT_INJECTOR_EXECUTOR =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index c96a70d07df..1bc039f2d54 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -161,6 +161,7 @@ public class CommonConfig {
   private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 10_000;
   private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 1000L;
   private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
+  private int pipeSubtaskCronEventInjectorExecutionIntervalSeconds = 10;
 
   private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
   private int pipeExtractorMatcherCacheSize = 1024;
@@ -711,6 +712,16 @@ public class CommonConfig {
         pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
   }
 
+  public int getPipeSubtaskCronEventInjectorExecutionIntervalSeconds() {
+    return pipeSubtaskCronEventInjectorExecutionIntervalSeconds;
+  }
+
+  public void setPipeSubtaskCronEventInjectorExecutionIntervalSeconds(
+      int pipeSubtaskCronEventInjectorExecutionIntervalSeconds) {
+    this.pipeSubtaskCronEventInjectorExecutionIntervalSeconds =
+        pipeSubtaskCronEventInjectorExecutionIntervalSeconds;
+  }
+
   public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
     this.pipeAirGapReceiverEnabled = pipeAirGapReceiverEnabled;
   }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ab4be8eacfc..6177623fead 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -300,6 +300,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_subtask_executor_pending_queue_max_blocking_time_ms",
                 String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
+    config.setPipeSubtaskCronEventInjectorExecutionIntervalSeconds(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_subtask_cron_event_injector_execution_interval_seconds",
+                String.valueOf(config.getPipeSubtaskCronEventInjectorExecutionIntervalSeconds()))));
 
     config.setPipeExtractorAssignerDisruptorRingBufferSize(
         Integer.parseInt(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index f19f948d6cc..2c6e307b0cc 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -71,6 +71,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
   }
 
+  public int getPipeSubtaskCronEventInjectorExecutionIntervalSeconds() {
+    return COMMON_CONFIG.getPipeSubtaskCronEventInjectorExecutionIntervalSeconds();
+  }
+
   /////////////////////////////// Extractor ///////////////////////////////
 
   public int getPipeExtractorAssignerDisruptorRingBufferSize() {