You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/03/05 06:13:01 UTC
[pulsar] branch master updated: On publish failures,
log error and count them as sys exceptions (#3704)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 07cebb1 On publish failures, log error and count them as sys exceptions (#3704)
07cebb1 is described below
commit 07cebb17f5df0bc3de70acacc5981271396129a9
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon Mar 4 22:12:56 2019 -0800
On publish failures, log error and count them as sys exceptions (#3704)
* On publish failures, log error and count them as sys exceptions
* Took feedback
---
.../org/apache/pulsar/functions/instance/ContextImpl.java | 13 +++++++++++--
.../pulsar/functions/instance/JavaInstanceRunnable.java | 2 +-
.../apache/pulsar/functions/instance/ContextImplTest.java | 2 +-
3 files changed, 13 insertions(+), 4 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 7271c87..60b8ec0 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -89,6 +89,8 @@ class ContextImpl implements Context, SinkContext, SourceContext {
private StateContextImpl stateContext;
private Map<String, Object> userConfigs;
+ private ComponentStatsManager statsManager;
+
Map<String, String[]> userMetricsLabels = new HashMap<>();
private final String[] metricsLabels;
private final Summary userMetricsSummary;
@@ -103,12 +105,13 @@ class ContextImpl implements Context, SinkContext, SourceContext {
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
- Utils.ComponentType componentType) {
+ Utils.ComponentType componentType, ComponentStatsManager statsManager) {
this.config = config;
this.logger = logger;
this.publishProducers = new HashMap<>();
this.inputTopics = inputTopics;
this.topicSchema = new TopicSchema(client);
+ this.statsManager = statsManager;
this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
@@ -359,7 +362,13 @@ class ContextImpl implements Context, SinkContext, SourceContext {
}
}
- return producer.sendAsync(object).thenApply(msgId -> null);
+ CompletableFuture<Void> future = producer.sendAsync(object).thenApply(msgId -> null);
+ future.exceptionally(e -> {
+ this.statsManager.incrSysExceptions(e);
+ logger.error("Failed to publish to topic {} with error {}", topicName, e);
+ return null;
+ });
+ return future;
}
@Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 19a0b2a..c9bf644 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -215,7 +215,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider,
- collectorRegistry, metricsLabels, this.componentType);
+ collectorRegistry, metricsLabels, this.componentType, this.stats);
}
/**
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 47c3539..7523fb1 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -79,7 +79,7 @@ public class ContextImplTest {
client,
new ArrayList<>(),
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
- Utils.ComponentType.FUNCTION);
+ Utils.ComponentType.FUNCTION, null);
}
@Test(expectedExceptions = IllegalStateException.class)