You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/28 06:56:22 UTC

[pulsar] branch master updated: Misc Function fixes:- (#3907)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 19dd255  Misc Function fixes:- (#3907)
19dd255 is described below

commit 19dd255526272f3310a7f9f4cd53d25eecd442e9
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Mar 27 23:56:17 2019 -0700

    Misc Function fixes:- (#3907)
    
    1) Collect input topics from the function details spec
    2) Catch all errors during source/sink close since its user code
---
 .../java/org/apache/pulsar/functions/instance/ContextImpl.java |  7 ++-----
 .../apache/pulsar/functions/instance/JavaInstanceRunnable.java | 10 +++-------
 .../org/apache/pulsar/functions/instance/ContextImplTest.java  |  1 -
 3 files changed, 5 insertions(+), 13 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 2cb0537..5400b41 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -76,8 +76,6 @@ class ContextImpl implements Context, SinkContext, SourceContext {
     private Map<String, Producer<?>> publishProducers;
     private ProducerBuilderImpl<?> producerBuilder;
 
-    private final List<String> inputTopics;
-
     private final TopicSchema topicSchema;
 
     private final SecretsProvider secretsProvider;
@@ -102,13 +100,12 @@ class ContextImpl implements Context, SinkContext, SourceContext {
     }
     private final Utils.ComponentType componentType;
 
-    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
+    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
                        SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
                        Utils.ComponentType componentType, ComponentStatsManager statsManager) {
         this.config = config;
         this.logger = logger;
         this.publishProducers = new HashMap<>();
-        this.inputTopics = inputTopics;
         this.topicSchema = new TopicSchema(client);
         this.statsManager = statsManager;
 
@@ -169,7 +166,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
 
     @Override
     public Collection<String> getInputTopics() {
-        return inputTopics;
+        return config.getFunctionDetails().getSource().getInputSpecsMap().keySet();
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 667e449..e210eb6 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -207,13 +207,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     }
 
     ContextImpl setupContext() {
-        List<String> inputTopics = null;
-        if (source instanceof PulsarSource) {
-            inputTopics = ((PulsarSource<?>) source).getInputTopics();
-        }
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
-        return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider,
+        return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
                 collectorRegistry, metricsLabels, this.componentType, this.stats);
     }
 
@@ -470,7 +466,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         if (source != null) {
             try {
                 source.close();
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
 
             }
@@ -479,7 +475,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         if (sink != null) {
             try {
                 sink.close();
-            } catch (Exception e) {
+            } catch (Throwable e) {
                 log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
             }
         }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 7523fb1..df243cf 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -77,7 +77,6 @@ public class ContextImplTest {
             config,
             logger,
             client,
-            new ArrayList<>(),
             new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
                 Utils.ComponentType.FUNCTION, null);
     }