You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/07/28 17:11:48 UTC

[pulsar] branch master updated: Allow topic compaction to be disabled in Pulsar Functions (#7677)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e2bf486  Allow topic compaction to be disabled in Pulsar Functions (#7677)
e2bf486 is described below

commit e2bf486287e7392fa2ff0edde0e7ad631f828132
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Jul 28 10:11:26 2020 -0700

    Allow topic compaction to be disabled in Pulsar Functions (#7677)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../java/org/apache/pulsar/functions/worker/SchedulerManager.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 21961cf..b1ac384 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -189,7 +189,9 @@ public class SchedulerManager implements AutoCloseable {
                     new LinkedBlockingQueue<>(5));
             executorService.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("worker-scheduler-%d").build());
             scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-assignment-topic-compactor"));
-            scheduleCompaction(this.scheduledExecutorService, workerConfig.getTopicCompactionFrequencySec());
+            if (workerConfig.getTopicCompactionFrequencySec() > 0) {
+                scheduleCompaction(this.scheduledExecutorService, workerConfig.getTopicCompactionFrequencySec());
+            }
 
             isRunning = true;
             lastMessageProduced = null;