You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/09/03 19:13:38 UTC

[pulsar] branch master updated: Various fixes for Batch Source (#7965)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1632dca  Various fixes for Batch Source (#7965)
1632dca is described below

commit 1632dcaf24a094a1fae31dacd457da88354c94c1
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Sep 3 12:13:25 2020 -0700

    Various fixes for Batch Source (#7965)
    
    * Various fixes for Batch Source
    1. Create intermediate topic/subscription prior to function running in case auto topic creation is turned off
    2. Fix possible NPE in CronTrigger when calling stop()
    3. Stop all producers created in ContextImpl
    4. Delete intermediate topic for batch source
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../pulsar/functions/instance/ContextImpl.java     |  26 +-
 .../pulsar/functions/instance/JavaInstance.java    |   1 +
 .../source/batch/BatchSourceExecutor.java          |  71 +++---
 .../source/batch/BatchSourceExecutorTest.java      |   1 +
 .../pulsar/functions/worker/FunctionActioner.java  | 281 ++++++++++++++++-----
 .../pulsar/io/batchdiscovery/CronTriggerer.java    |   5 +-
 6 files changed, 288 insertions(+), 97 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index e3f169e..2be3e64 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -59,7 +60,7 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 /**
  * This class implements the Context interface exposed to the user.
  */
-class ContextImpl implements Context, SinkContext, SourceContext {
+class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable {
     private InstanceConfig config;
     private Logger logger;
 
@@ -603,4 +604,27 @@ class ContextImpl implements Context, SinkContext, SourceContext {
             this.underlyingBuilder = underlyingBuilder;
         }
     }
+
+    @Override
+    public void close() {
+        List<CompletableFuture> futures = new LinkedList<>();
+
+        if (publishProducers != null) {
+            for (Producer<?> producer : publishProducers.values()) {
+                futures.add(producer.closeAsync());
+            }
+        }
+
+        if (tlPublishProducers != null) {
+            for (Producer<?> producer : tlPublishProducers.get().values()) {
+                futures.add(producer.closeAsync());
+            }
+        }
+
+        try {
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+        } catch (InterruptedException | ExecutionException e) {
+            logger.warn("Failed to close producers", e);
+        }
+    }
 }
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 1e18a07..8c9616d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -127,6 +127,7 @@ public class JavaInstance implements AutoCloseable {
 
     @Override
     public void close() {
+        context.close();
     }
 
     public Map<String, Double> getAndResetMetrics() {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
index 2933b0b..c42d22c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
@@ -28,6 +28,8 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.Actions;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
@@ -126,9 +128,7 @@ public class BatchSourceExecutor<T> implements Source<T> {
   }
 
   private void start() throws Exception {
-    // This is the first thing to do to ensure that any tasks discovered during the discover
-    // phase are not lost
-    setupInstanceSubscription();
+    createIntermediateTopicConsumer();
     batchSource.open(this.config, this.sourceContext);
     if (sourceContext.getInstanceId() == 0) {
       discoveryTriggerer.init(batchSourceConfig.getDiscoveryTriggererConfig(),
@@ -188,45 +188,50 @@ public class BatchSourceExecutor<T> implements Source<T> {
     }
   }
 
-  private void setupInstanceSubscription() {
+  private void createIntermediateTopicConsumer() {
     String subName = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName(
-            sourceContext.getTenant(), sourceContext.getNamespace(),
-            sourceContext.getSourceName());
+      sourceContext.getTenant(), sourceContext.getNamespace(),
+      sourceContext.getSourceName());
+    String fqfn = FunctionCommon.getFullyQualifiedName(
+      sourceContext.getTenant(), sourceContext.getNamespace(),
+      sourceContext.getSourceName());
     try {
       Actions.newBuilder()
-              .addAction(
-                      Actions.Action.builder()
-                              .actionName(String.format("Setting up instance consumer for BatchSource intermediate " +
-                                      "topic for function %s", FunctionCommon.getFullyQualifiedName(
-                                              sourceContext.getTenant(), sourceContext.getNamespace(),
-                                      sourceContext.getSourceName())))
-                              .numRetries(10)
-                              .sleepBetweenInvocationsMs(1000)
-                              .supplier(() -> {
-                                try {
-                                  CompletableFuture<Consumer<byte[]>> cf = sourceContext.newConsumerBuilder(Schema.BYTES)
-                                          .subscriptionName(subName)
-                                          .subscriptionType(SubscriptionType.Shared)
-                                          .topic(intermediateTopicName)
-                                          .subscribeAsync();
-                                  intermediateTopicConsumer = cf.join();
-                                  return Actions.ActionResult.builder()
-                                          .success(true)
-                                          .build();
-                                } catch (Exception e) {
-                                    return Actions.ActionResult.builder()
-                                            .success(false)
-                                            .build();
-                                }
-                              })
-                              .build())
-              .run();
+        .addAction(
+          Actions.Action.builder()
+            .actionName(String.format("Setting up instance consumer for BatchSource intermediate " +
+              "topic for function %s", fqfn))
+            .numRetries(10)
+            .sleepBetweenInvocationsMs(1000)
+            .supplier(() -> {
+              try {
+                CompletableFuture<Consumer<byte[]>> cf = sourceContext.newConsumerBuilder(Schema.BYTES)
+                  .subscriptionName(subName)
+                  .subscriptionType(SubscriptionType.Shared)
+                  .topic(intermediateTopicName)
+                  .properties(InstanceUtils.getProperties(
+                    Function.FunctionDetails.ComponentType.SOURCE, fqfn, sourceContext.getInstanceId()))
+                  .subscribeAsync();
+                intermediateTopicConsumer = cf.join();
+                return Actions.ActionResult.builder()
+                  .success(true)
+                  .build();
+              } catch (Exception e) {
+                return Actions.ActionResult.builder()
+                  .success(false)
+                  .errorMsg(e.getMessage())
+                  .build();
+              }
+            })
+            .build())
+        .run();
     } catch (InterruptedException e) {
       log.error("Error setting up instance subscription for intermediate topic", e);
       throw new RuntimeException(e);
     }
   }
 
+
   private void retrieveNextTask() throws Exception {
     currentTask = intermediateTopicConsumer.receive();
     return;
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
index 28c8ac9..5125453 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java
@@ -222,6 +222,7 @@ public class BatchSourceExecutorTest {
     consumerBuilder = Mockito.mock(ConsumerBuilder.class);
     Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(Mockito.any());
     Mockito.doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(Mockito.any());
+    Mockito.doReturn(consumerBuilder).when(consumerBuilder).properties(Mockito.anyMap());
     Mockito.doReturn(consumerBuilder).when(consumerBuilder).topic(Mockito.any());
     discoveredTask = Mockito.mock(Message.class);
     consumer = Mockito.mock(org.apache.pulsar.client.api.Consumer.class);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index b43ce0a..a79138d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.io.MoreFiles;
 import com.google.common.io.RecursiveDeleteOption;
 
@@ -27,10 +28,13 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -60,6 +64,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
@@ -130,6 +135,9 @@ public class FunctionActioner {
                 }
             }
 
+            // Setup for batch sources if necessary
+            setupBatchSource(functionDetails);
+
             RuntimeSpawner runtimeSpawner = getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile);
             functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
 
@@ -325,81 +333,136 @@ public class FunctionActioner {
                             ? InstanceUtils.getDefaultSubscriptionName(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails())
                             : functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName();
 
-                    deleteSubscription(topic, consumerSpec, subscriptionName, fqfn);
+                    deleteSubscription(topic, consumerSpec, subscriptionName, String.format("Cleaning up subscriptions for function %s", fqfn));
                 }
             });
         }
-        if (InstanceUtils.calculateSubjectType(details) == FunctionDetails.ComponentType.SOURCE) {
-            // topicName -> subscriptions
-            Map<String, String> subscriptions =
-                    SourceConfigUtils.computeBatchSourceIntermediateTopicSubscriptions(details,
-                            FunctionCommon.getFullyQualifiedName(details));
-            if (subscriptions != null) {
-                subscriptions.forEach((topic, subscriptionName) -> {
-                    Function.ConsumerSpec consumerSpec = Function.ConsumerSpec.newBuilder().setIsRegexPattern(false).build();
-                    deleteSubscription(topic, consumerSpec, subscriptionName, fqfn);
-                });
-            }
-        }
+
+        // clean up done for batch sources if necessary
+        cleanupBatchSource(details);
     }
 
-    private void deleteSubscription(String topic, Function.ConsumerSpec consumerSpec, String subscriptionName, String fqfn) {
+    private void deleteSubscription(String topic, Function.ConsumerSpec consumerSpec, String subscriptionName, String msg) {
         try {
             Actions.newBuilder()
                     .addAction(
-                            Actions.Action.builder()
-                                    .actionName(String.format("Cleaning up subscriptions for function %s", fqfn))
-                                    .numRetries(10)
-                                    .sleepBetweenInvocationsMs(1000)
-                                    .supplier(() -> {
-                                        try {
-                                            if (consumerSpec.getIsRegexPattern()) {
-                                                pulsarAdmin.namespaces().unsubscribeNamespace(TopicName
-                                                        .get(topic).getNamespace(), subscriptionName);
-                                            } else {
-                                                pulsarAdmin.topics().deleteSubscription(topic,
-                                                        subscriptionName);
-                                            }
-                                        } catch (PulsarAdminException e) {
-                                            if (e instanceof PulsarAdminException.NotFoundException) {
-                                                return Actions.ActionResult.builder()
-                                                        .success(true)
-                                                        .build();
-                                            } else {
-                                                // for debugging purposes
-                                                List<Map<String, String>> existingConsumers = Collections.emptyList();
-                                                try {
-                                                    TopicStats stats = pulsarAdmin.topics().getStats(topic);
-                                                    SubscriptionStats sub = stats.subscriptions.get(subscriptionName);
-                                                    if (sub != null) {
-                                                        existingConsumers = sub.consumers.stream()
-                                                                .map(consumerStats -> consumerStats.metadata)
-                                                                .collect(Collectors.toList());
-                                                    }
-                                                } catch (PulsarAdminException e1) {
-
-                                                }
-
-                                                String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage();
-                                                return Actions.ActionResult.builder()
-                                                        .success(false)
-                                                        .errorMsg(String.format("%s - existing consumers: %s", errorMsg, existingConsumers))
-                                                        .build();
-                                            }
-                                        }
-
-                                        return Actions.ActionResult.builder()
-                                                .success(true)
-                                                .build();
-
-                                    })
-                                    .build())
+                      Actions.Action.builder()
+                        .actionName(msg)
+                        .numRetries(10)
+                        .sleepBetweenInvocationsMs(1000)
+                        .supplier(
+                          getDeleteSubscriptionSupplier(topic,
+                            consumerSpec.getIsRegexPattern(),
+                            subscriptionName)
+                        )
+                        .build())
                     .run();
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }
     }
 
+    private Supplier<Actions.ActionResult> getDeleteSubscriptionSupplier(
+      String topic, boolean isRegex, String subscriptionName) {
+        return () -> {
+            try {
+                if (isRegex) {
+                    pulsarAdmin.namespaces().unsubscribeNamespace(TopicName
+                      .get(topic).getNamespace(), subscriptionName);
+                } else {
+                    pulsarAdmin.topics().deleteSubscription(topic,
+                      subscriptionName);
+                }
+            } catch (PulsarAdminException e) {
+                if (e instanceof PulsarAdminException.NotFoundException) {
+                    return Actions.ActionResult.builder()
+                      .success(true)
+                      .build();
+                } else {
+                    // for debugging purposes
+                    List<Map<String, String>> existingConsumers = Collections.emptyList();
+                    SubscriptionStats sub = null;
+                    try {
+                        TopicStats stats = pulsarAdmin.topics().getStats(topic);
+                        sub = stats.subscriptions.get(subscriptionName);
+                        if (sub != null) {
+                            existingConsumers = sub.consumers.stream()
+                              .map(consumerStats -> consumerStats.metadata)
+                              .collect(Collectors.toList());
+                        }
+                    } catch (PulsarAdminException e1) {
+
+                    }
+
+                    String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage();
+                    String finalErrorMsg;
+                    if (sub != null) {
+                        try {
+                            finalErrorMsg = String.format("%s - existing consumers: %s",
+                              errorMsg, ObjectMapperFactory.getThreadLocal().writeValueAsString(sub));
+                        } catch (JsonProcessingException jsonProcessingException) {
+                            finalErrorMsg = errorMsg;
+                        }
+                    } else {
+                        finalErrorMsg = errorMsg;
+                    }
+                    return Actions.ActionResult.builder()
+                      .success(false)
+                      .errorMsg(finalErrorMsg)
+                      .build();
+                }
+            }
+
+            return Actions.ActionResult.builder()
+              .success(true)
+              .build();
+        };
+    }
+
+    private Supplier<Actions.ActionResult> getDeleteTopicSupplier(String topic) {
+        return () -> {
+            try {
+                pulsarAdmin.topics().delete(topic);
+            } catch (PulsarAdminException e) {
+                if (e instanceof PulsarAdminException.NotFoundException) {
+                    return Actions.ActionResult.builder()
+                      .success(true)
+                      .build();
+                } else {
+                    // for debugging purposes
+                    TopicStats stats = null;
+                    try {
+                        stats = pulsarAdmin.topics().getStats(topic);
+                    } catch (PulsarAdminException e1) {
+
+                    }
+
+                    String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage();
+                    String finalErrorMsg;
+                    if (stats != null) {
+                        try {
+                            finalErrorMsg = String.format("%s - topic stats: %s",
+                              errorMsg, ObjectMapperFactory.getThreadLocal().writeValueAsString(stats));
+                        } catch (JsonProcessingException jsonProcessingException) {
+                            finalErrorMsg = errorMsg;
+                        }
+                    } else {
+                        finalErrorMsg = errorMsg;
+                    }
+
+                    return Actions.ActionResult.builder()
+                      .success(false)
+                      .errorMsg(finalErrorMsg)
+                      .build();
+                }
+            }
+
+            return Actions.ActionResult.builder()
+              .success(true)
+              .build();
+        };
+    }
+
     private String getDownloadPackagePath(FunctionMetaData functionMetaData, int instanceId) {
         return StringUtils.join(
                 new String[]{
@@ -508,4 +571,100 @@ public class FunctionActioner {
                 throw new RuntimeException("Unknown runtime " + FunctionDetails.getRuntime());
         }
     }
+
+    private void setupBatchSource(Function.FunctionDetails functionDetails) {
+        if (isBatchSource(functionDetails)) {
+
+            String intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(
+              functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName()).toString();
+
+            String intermediateTopicSubscription = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName(
+              functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
+            String fqfn = FunctionCommon.getFullyQualifiedName(
+              functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
+            try {
+                Actions.newBuilder()
+                  .addAction(
+                    Actions.Action.builder()
+                      .actionName(String.format("Creating intermediate topic %s with subscription %s for Batch Source %s",
+                        intermediateTopicName, intermediateTopicSubscription, fqfn))
+                      .numRetries(10)
+                      .sleepBetweenInvocationsMs(1000)
+                      .supplier(() -> {
+                          try {
+                              pulsarAdmin.topics().createSubscription(intermediateTopicName, intermediateTopicSubscription, MessageId.latest);
+                              return Actions.ActionResult.builder()
+                                .success(true)
+                                .build();
+                          } catch (PulsarAdminException.ConflictException e) {
+                              // topic and subscription already exists so just continue
+                              return Actions.ActionResult.builder()
+                                .success(true)
+                                .build();
+                          } catch (Exception e) {
+                              return Actions.ActionResult.builder()
+                                .errorMsg(e.getMessage())
+                                .success(false)
+                                .build();
+                          }
+                      })
+                      .build())
+                  .run();
+            } catch (InterruptedException e) {
+                log.error("Error setting up instance subscription for intermediate topic", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void cleanupBatchSource(Function.FunctionDetails functionDetails) {
+        if (isBatchSource(functionDetails)) {
+            // clean up intermediate topic
+            String intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(functionDetails.getTenant(),
+              functionDetails.getNamespace(), functionDetails.getName()).toString();
+            String intermediateTopicSubscription = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName(
+              functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
+            String fqfn = FunctionCommon.getFullyQualifiedName(functionDetails);
+            try {
+                Actions.newBuilder()
+                  .addAction(
+                    Actions.Action.builder()
+                      .actionName(String.format("Removing intermediate topic subscription %s for Batch Source %s",
+                        intermediateTopicSubscription, fqfn))
+                      .numRetries(10)
+                      .sleepBetweenInvocationsMs(1000)
+                      .continueOn(true)
+                      .supplier(
+                        getDeleteSubscriptionSupplier(intermediateTopicName,
+                          false,
+                          intermediateTopicSubscription)
+                      )
+                      .build())
+                  .addAction(Actions.Action.builder()
+                    .actionName(String.format("Deleting intermediate topic %s for Batch Source %s",
+                      intermediateTopicName, fqfn))
+                    .numRetries(10)
+                    .sleepBetweenInvocationsMs(1000)
+                    .supplier(getDeleteTopicSupplier(intermediateTopicName))
+                    .build())
+                  .run();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private static boolean isBatchSource(Function.FunctionDetails functionDetails) {
+        if (InstanceUtils.calculateSubjectType(functionDetails) == FunctionDetails.ComponentType.SOURCE) {
+            String fqfn = FunctionCommon.getFullyQualifiedName(functionDetails);
+            Map<String, Object> configMap = SourceConfigUtils.extractSourceConfig(functionDetails.getSource(), fqfn);
+            if (configMap != null) {
+                BatchSourceConfig batchSourceConfig = SourceConfigUtils.extractBatchSourceConfig(configMap);
+                if (batchSourceConfig != null) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
 }
diff --git a/pulsar-io/batch-discovery-triggerers/src/main/java/org/apache/pulsar/io/batchdiscovery/CronTriggerer.java b/pulsar-io/batch-discovery-triggerers/src/main/java/org/apache/pulsar/io/batchdiscovery/CronTriggerer.java
index 6c35b9d..9b166c8 100644
--- a/pulsar-io/batch-discovery-triggerers/src/main/java/org/apache/pulsar/io/batchdiscovery/CronTriggerer.java
+++ b/pulsar-io/batch-discovery-triggerers/src/main/java/org/apache/pulsar/io/batchdiscovery/CronTriggerer.java
@@ -58,8 +58,9 @@ public class CronTriggerer implements BatchSourceTriggerer {
 
   @Override
   public void stop() {
-    scheduler.shutdown();
+    if (scheduler != null) {
+      scheduler.shutdown();
+    }
   }
-
 }