You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/09/28 09:55:48 UTC
[iotdb] 01/05: new config prop: pipeSubtaskCronEventInjectorExecutionIntervalSeconds
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch fix-pipe-cause-wal-pin
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6f7d78721cc70124b503961bc483f9f6fd409bae
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Sep 28 12:23:24 2023 +0800
new config prop: pipeSubtaskCronEventInjectorExecutionIntervalSeconds
---
.../iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java | 4 +++-
.../main/java/org/apache/iotdb/commons/conf/CommonConfig.java | 11 +++++++++++
.../java/org/apache/iotdb/commons/conf/CommonDescriptor.java | 5 +++++
.../java/org/apache/iotdb/commons/pipe/config/PipeConfig.java | 4 ++++
4 files changed, 23 insertions(+), 1 deletion(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
index 8ec9798e187..20d8028855c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeCronEventInjector.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.agent.runtime;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
import org.slf4j.Logger;
@@ -35,7 +36,8 @@ public class PipeCronEventInjector {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeCronEventInjector.class);
- private static final int CRON_EVENT_INJECTOR_INTERVAL_SECONDS = 1;
+ private static final int CRON_EVENT_INJECTOR_INTERVAL_SECONDS =
+ PipeConfig.getInstance().getPipeSubtaskCronEventInjectorExecutionIntervalSeconds();
private static final ScheduledExecutorService CRON_EVENT_INJECTOR_EXECUTOR =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index c96a70d07df..1bc039f2d54 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -161,6 +161,7 @@ public class CommonConfig {
private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 10_000;
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 1000L;
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
+ private int pipeSubtaskCronEventInjectorExecutionIntervalSeconds = 10;
private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
private int pipeExtractorMatcherCacheSize = 1024;
@@ -711,6 +712,16 @@ public class CommonConfig {
pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
}
+ public int getPipeSubtaskCronEventInjectorExecutionIntervalSeconds() {
+ return pipeSubtaskCronEventInjectorExecutionIntervalSeconds;
+ }
+
+ public void setPipeSubtaskCronEventInjectorExecutionIntervalSeconds(
+ int pipeSubtaskCronEventInjectorExecutionIntervalSeconds) {
+ this.pipeSubtaskCronEventInjectorExecutionIntervalSeconds =
+ pipeSubtaskCronEventInjectorExecutionIntervalSeconds;
+ }
+
public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
this.pipeAirGapReceiverEnabled = pipeAirGapReceiverEnabled;
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ab4be8eacfc..6177623fead 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -300,6 +300,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_subtask_executor_pending_queue_max_blocking_time_ms",
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
+ config.setPipeSubtaskCronEventInjectorExecutionIntervalSeconds(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_subtask_cron_event_injector_execution_interval_seconds",
+ String.valueOf(config.getPipeSubtaskCronEventInjectorExecutionIntervalSeconds()))));
config.setPipeExtractorAssignerDisruptorRingBufferSize(
Integer.parseInt(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index f19f948d6cc..2c6e307b0cc 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -71,6 +71,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
}
+ public int getPipeSubtaskCronEventInjectorExecutionIntervalSeconds() {
+ return COMMON_CONFIG.getPipeSubtaskCronEventInjectorExecutionIntervalSeconds();
+ }
+
/////////////////////////////// Extractor ///////////////////////////////
public int getPipeExtractorAssignerDisruptorRingBufferSize() {