You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/28 06:56:22 UTC
[pulsar] branch master updated: Misc Function fixes:- (#3907)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 19dd255 Misc Function fixes:- (#3907)
19dd255 is described below
commit 19dd255526272f3310a7f9f4cd53d25eecd442e9
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Mar 27 23:56:17 2019 -0700
Misc Function fixes:- (#3907)
1) Collect input topics from the function details spec
2) Catch all errors during source/sink close since its user code
---
.../java/org/apache/pulsar/functions/instance/ContextImpl.java | 7 ++-----
.../apache/pulsar/functions/instance/JavaInstanceRunnable.java | 10 +++-------
.../org/apache/pulsar/functions/instance/ContextImplTest.java | 1 -
3 files changed, 5 insertions(+), 13 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 2cb0537..5400b41 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -76,8 +76,6 @@ class ContextImpl implements Context, SinkContext, SourceContext {
private Map<String, Producer<?>> publishProducers;
private ProducerBuilderImpl<?> producerBuilder;
- private final List<String> inputTopics;
-
private final TopicSchema topicSchema;
private final SecretsProvider secretsProvider;
@@ -102,13 +100,12 @@ class ContextImpl implements Context, SinkContext, SourceContext {
}
private final Utils.ComponentType componentType;
- public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
+ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
Utils.ComponentType componentType, ComponentStatsManager statsManager) {
this.config = config;
this.logger = logger;
this.publishProducers = new HashMap<>();
- this.inputTopics = inputTopics;
this.topicSchema = new TopicSchema(client);
this.statsManager = statsManager;
@@ -169,7 +166,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
@Override
public Collection<String> getInputTopics() {
- return inputTopics;
+ return config.getFunctionDetails().getSource().getInputSpecsMap().keySet();
}
@Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 667e449..e210eb6 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -207,13 +207,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
ContextImpl setupContext() {
- List<String> inputTopics = null;
- if (source instanceof PulsarSource) {
- inputTopics = ((PulsarSource<?>) source).getInputTopics();
- }
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
- return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider,
+ return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats);
}
@@ -470,7 +466,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
if (source != null) {
try {
source.close();
- } catch (Exception e) {
+ } catch (Throwable e) {
log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
}
@@ -479,7 +475,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
if (sink != null) {
try {
sink.close();
- } catch (Exception e) {
+ } catch (Throwable e) {
log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
}
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 7523fb1..df243cf 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -77,7 +77,6 @@ public class ContextImplTest {
config,
logger,
client,
- new ArrayList<>(),
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
Utils.ComponentType.FUNCTION, null);
}