You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/12/10 19:48:49 UTC
[pulsar] branch master updated: Introduce Source and Sink
exceptions in Status (#3142)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8c7ff2b Introduce Source and Sink exceptions in Status (#3142)
8c7ff2b is described below
commit 8c7ff2ba7abb57a0a9e502bfd051c24c02835749
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon Dec 10 11:48:44 2018 -0800
Introduce Source and Sink exceptions in Status (#3142)
* Introduce Source and Sink exceptions in Status
* Added the counter at the right place
---
.../pulsar/common/policies/data/SinkStatus.java | 6 +
.../pulsar/common/policies/data/SourceStatus.java | 6 +
.../functions/instance/FunctionStatsManager.java | 127 +++++++++++++++++++++
.../functions/instance/JavaInstanceRunnable.java | 2 +
.../src/main/proto/InstanceCommunication.proto | 4 +
.../functions/worker/rest/api/FunctionsImpl.java | 18 ++-
.../pulsar/functions/worker/rest/api/SinkImpl.java | 28 ++++-
.../functions/worker/rest/api/SourceImpl.java | 28 ++++-
8 files changed, 210 insertions(+), 9 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
index 609f107..3b7a7a1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
@@ -58,6 +58,12 @@ public class SinkStatus {
// A list of the most recent system exceptions
public List<ExceptionInformation> latestSystemExceptions;
+ // Number of times there was a sink exception
+ public long numSinkExceptions;
+
+ // A list of the most recent sink exceptions
+ public List<ExceptionInformation> latestSinkExceptions;
+
// Number of messages written to sink
public long numWrittenToSink;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
index 4043900..3f103e0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
@@ -58,6 +58,12 @@ public class SourceStatus {
// A list of the most recent system exceptions
public List<ExceptionInformation> latestSystemExceptions;
+ // Number of times there was a exception from source while reading messages
+ public long numSourceExceptions;
+
+ // A list of the most recent source exceptions
+ public List<ExceptionInformation> latestSourceExceptions;
+
// Number of messages written into pulsar
public long numWritten;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index 9059f79..96941f5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -60,6 +60,8 @@ public class FunctionStatsManager implements AutoCloseable {
public static final String PROCESSED_SUCCESSFULLY_TOTAL = "processed_successfully_total";
public static final String SYSTEM_EXCEPTIONS_TOTAL = "system_exceptions_total";
public static final String USER_EXCEPTIONS_TOTAL = "user_exceptions_total";
+ public static final String SOURCE_EXCEPTIONS_TOTAL = "source_exceptions_total";
+ public static final String SINK_EXCEPTIONS_TOTAL = "sink_exceptions_total";
public static final String PROCESS_LATENCY_MS = "process_latency_ms";
public static final String LAST_INVOCATION = "last_invocation";
public static final String RECEIVED_TOTAL = "received_total";
@@ -67,6 +69,8 @@ public class FunctionStatsManager implements AutoCloseable {
public static final String PROCESSED_SUCCESSFULLY_TOTAL_1min = "processed_successfully_total_1min";
public static final String SYSTEM_EXCEPTIONS_TOTAL_1min = "system_exceptions_total_1min";
public static final String USER_EXCEPTIONS_TOTAL_1min = "user_exceptions_total_1min";
+ public static final String SOURCE_EXCEPTIONS_TOTAL_1min = "source_exceptions_total_1min";
+ public static final String SINK_EXCEPTIONS_TOTAL_1min = "sink_exceptions_total_1min";
public static final String PROCESS_LATENCY_MS_1min = "process_latency_ms_1min";
public static final String RECEIVED_TOTAL_1min = "received_total_1min";
@@ -78,6 +82,10 @@ public class FunctionStatsManager implements AutoCloseable {
final Counter statTotalUserExceptions;
+ final Counter statTotalSourceExceptions;
+
+ final Counter statTotalSinkExceptions;
+
final Summary statProcessLatency;
final Gauge statlastInvocation;
@@ -92,6 +100,10 @@ public class FunctionStatsManager implements AutoCloseable {
final Counter statTotalUserExceptions1min;
+ final Counter statTotalSourceExceptions1min;
+
+ final Counter statTotalSinkExceptions1min;
+
final Summary statProcessLatency1min;
final Counter statTotalRecordsRecieved1min;
@@ -102,6 +114,10 @@ public class FunctionStatsManager implements AutoCloseable {
final Gauge sysExceptions;
+ final Gauge sourceExceptions;
+
+ final Gauge sinkExceptions;
+
private String[] metricsLabels;
private ScheduledFuture<?> scheduledFuture;
@@ -112,11 +128,19 @@ public class FunctionStatsManager implements AutoCloseable {
private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions = EvictingQueue.create(10);
@Getter
private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10);
+ @Getter
+ private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSourceExceptions = EvictingQueue.create(10);
+ @Getter
+ private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSinkExceptions = EvictingQueue.create(10);
private final RateLimiter userExceptionRateLimiter;
private final RateLimiter sysExceptionRateLimiter;
+ private final RateLimiter sourceExceptionRateLimiter;
+
+ private final RateLimiter sinkExceptionRateLimiter;
+
public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService) {
this.collectorRegistry = collectorRegistry;
@@ -141,6 +165,18 @@ public class FunctionStatsManager implements AutoCloseable {
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ statTotalSourceExceptions = Counter.build()
+ .name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL)
+ .help("Total number of source exceptions.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+
+ statTotalSinkExceptions = Counter.build()
+ .name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL)
+ .help("Total number of sink exceptions.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+
statProcessLatency = Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
.help("Process latency in milliseconds.")
@@ -181,6 +217,18 @@ public class FunctionStatsManager implements AutoCloseable {
.labelNames(metricsLabelNames)
.register(collectorRegistry);
+ statTotalSourceExceptions1min = Counter.build()
+ .name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min)
+ .help("Total number of source exceptions in the last 1 minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+
+ statTotalSinkExceptions1min = Counter.build()
+ .name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min)
+ .help("Total number of sink exceptions in the last 1 minute.")
+ .labelNames(metricsLabelNames)
+ .register(collectorRegistry);
+
statProcessLatency1min = Summary.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
.help("Process latency in milliseconds in the last 1 minute.")
@@ -209,6 +257,18 @@ public class FunctionStatsManager implements AutoCloseable {
.help("Exception from system code.")
.register(collectorRegistry);
+ sourceExceptions = Gauge.build()
+ .name(PULSAR_FUNCTION_METRICS_PREFIX + "source_exception")
+ .labelNames(exceptionMetricsLabelNames)
+ .help("Exception from source.")
+ .register(collectorRegistry);
+
+ sinkExceptions = Gauge.build()
+ .name(PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception")
+ .labelNames(exceptionMetricsLabelNames)
+ .help("Exception from sink.")
+ .register(collectorRegistry);
+
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
@@ -222,6 +282,9 @@ public class FunctionStatsManager implements AutoCloseable {
userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+ sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+ sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+
}
public void addUserException(Exception ex) {
@@ -256,6 +319,38 @@ public class FunctionStatsManager implements AutoCloseable {
}
}
+ public void addSourceException(Throwable ex) {
+ long ts = System.currentTimeMillis();
+ InstanceCommunication.FunctionStatus.ExceptionInformation info =
+ InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+ .setExceptionString(ex.getMessage()).setMsSinceEpoch(ts).build();
+ latestSourceExceptions.add(info);
+
+ // report exception throw prometheus
+ if (sourceExceptionRateLimiter.tryAcquire()) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage();
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
+ }
+ }
+
+ public void addSinkException(Throwable ex) {
+ long ts = System.currentTimeMillis();
+ InstanceCommunication.FunctionStatus.ExceptionInformation info =
+ InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+ .setExceptionString(ex.getMessage()).setMsSinceEpoch(ts).build();
+ latestSinkExceptions.add(info);
+
+ // report exception throw prometheus
+ if (sinkExceptionRateLimiter.tryAcquire()) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage();
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
+ }
+ }
+
public void incrTotalReceived() {
statTotalRecordsRecieved.labels(metricsLabels).inc();
statTotalRecordsRecieved1min.labels(metricsLabels).inc();
@@ -278,6 +373,18 @@ public class FunctionStatsManager implements AutoCloseable {
addUserException(userException);
}
+ public void incrSourceExceptions(Exception userException) {
+ statTotalSourceExceptions.labels(metricsLabels).inc();
+ statTotalSourceExceptions1min.labels(metricsLabels).inc();
+ addSourceException(userException);
+ }
+
+ public void incrSinkExceptions(Exception userException) {
+ statTotalSinkExceptions.labels(metricsLabels).inc();
+ statTotalSinkExceptions1min.labels(metricsLabels).inc();
+ addSinkException(userException);
+ }
+
public void setLastInvocation(long ts) {
statlastInvocation.labels(metricsLabels).set(ts);
}
@@ -311,6 +418,14 @@ public class FunctionStatsManager implements AutoCloseable {
return statTotalUserExceptions.labels(metricsLabels).get();
}
+ public double getTotalSourceExceptions() {
+ return statTotalSourceExceptions.labels(metricsLabels).get();
+ }
+
+ public double getTotalSinkExceptions() {
+ return statTotalSinkExceptions.labels(metricsLabels).get();
+ }
+
public double getLastInvocation() {
return statlastInvocation.labels(metricsLabels).get();
}
@@ -352,6 +467,14 @@ public class FunctionStatsManager implements AutoCloseable {
return statTotalUserExceptions1min.labels(metricsLabels).get();
}
+ public double getTotalSourceExceptions1min() {
+ return statTotalSourceExceptions1min.labels(metricsLabels).get();
+ }
+
+ public double getTotalSinkExceptions1min() {
+ return statTotalSinkExceptions1min.labels(metricsLabels).get();
+ }
+
public double getAvgProcessLatency1min() {
return statProcessLatency1min.labels(metricsLabels).get().count <= 0.0
? 0 : statProcessLatency1min.labels(metricsLabels).get().sum / statProcessLatency1min.labels(metricsLabels).get().count;
@@ -377,10 +500,14 @@ public class FunctionStatsManager implements AutoCloseable {
statTotalProcessedSuccessfully1min.clear();
statTotalSysExceptions1min.clear();
statTotalUserExceptions1min.clear();
+ statTotalSourceExceptions1min.clear();
+ statTotalSinkExceptions1min.clear();
statProcessLatency1min.clear();
statTotalRecordsRecieved1min.clear();
latestUserExceptions.clear();
latestSystemExceptions.clear();
+ latestSourceExceptions.clear();
+ latestSinkExceptions.clear();
}
public String getStatsAsString() throws IOException {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e55fabc..9e727a5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -419,6 +419,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
this.sink.write(new SinkRecord<>(srcRecord, output));
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
+ stats.incrSinkExceptions(e);
throw new RuntimeException(e);
}
}
@@ -428,6 +429,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
try {
record = this.source.read();
} catch (Exception e) {
+ stats.incrSourceExceptions(e);
log.info("Encountered exception in source read: ", e);
throw new RuntimeException(e);
}
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 13bb206..1b50ea8 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -39,6 +39,10 @@ message FunctionStatus {
repeated ExceptionInformation latestUserExceptions = 7;
int64 numSystemExceptions = 8;
repeated ExceptionInformation latestSystemExceptions = 9;
+ int64 numSourceExceptions = 18;
+ repeated ExceptionInformation latestSourceExceptions = 19;
+ int64 numSinkExceptions = 20;
+ repeated ExceptionInformation latestSinkExceptions = 21;
// map from topic name to number of deserialization exceptions
// map<string, int64> deserializationExceptions = 10;
// number of serialization exceptions on the output
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 355cf11..9a6f739 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -74,7 +74,9 @@ public class FunctionsImpl extends ComponentImpl {
}
functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
- functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+ // For regular functions source/sink errors are system exceptions
+ functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions()
+ + status.getNumSourceExceptions() + status.getNumSinkExceptions());
List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
ExceptionInformation exceptionInformation
@@ -83,6 +85,20 @@ public class FunctionsImpl extends ComponentImpl {
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
systemExceptionInformationList.add(exceptionInformation);
}
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+ exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+ exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ systemExceptionInformationList.add(exceptionInformation);
+ }
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+ exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+ exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ systemExceptionInformationList.add(exceptionInformation);
+ }
functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 45bbba4..6d70fbb 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -62,17 +62,18 @@ public class SinkImpl extends ComponentImpl {
sinkInstanceStatusData.setNumRestarts(status.getNumRestarts());
sinkInstanceStatusData.setNumReadFromPulsar(status.getNumReceived());
- List<ExceptionInformation> userExceptionInformationList = new LinkedList<>();
+ // We treat source/user/system exceptions returned from function as system exceptions
+ sinkInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions()
+ + status.getNumUserExceptions() + status.getNumSourceExceptions());
+ List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
ExceptionInformation exceptionInformation
= new ExceptionInformation();
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
- userExceptionInformationList.add(exceptionInformation);
+ systemExceptionInformationList.add(exceptionInformation);
}
- sinkInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
- List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
ExceptionInformation exceptionInformation
= new ExceptionInformation();
@@ -80,8 +81,27 @@ public class SinkImpl extends ComponentImpl {
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
systemExceptionInformationList.add(exceptionInformation);
}
+
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+ exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+ exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ systemExceptionInformationList.add(exceptionInformation);
+ }
sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
+ sinkInstanceStatusData.setNumSinkExceptions(status.getNumSinkExceptions());
+ List<ExceptionInformation> sinkExceptionInformationList = new LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+ exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+ exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ sinkExceptionInformationList.add(exceptionInformation);
+ }
+ sinkInstanceStatusData.setLatestSinkExceptions(sinkExceptionInformationList);
+
sinkInstanceStatusData.setNumWrittenToSink(status.getNumSuccessfullyProcessed());
sinkInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime());
sinkInstanceStatusData.setWorkerId(assignedWorkerId);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index f05eea6..8c3c988 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -61,17 +61,29 @@ public class SourceImpl extends ComponentImpl {
sourceInstanceStatusData.setNumRestarts(status.getNumRestarts());
sourceInstanceStatusData.setNumReceivedFromSource(status.getNumReceived());
- List<ExceptionInformation> userExceptionInformationList = new LinkedList<>();
- for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
+ sourceInstanceStatusData.setNumSourceExceptions(status.getNumSourceExceptions());
+ List<ExceptionInformation> sourceExceptionInformationList = new LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
ExceptionInformation exceptionInformation
= new ExceptionInformation();
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
- userExceptionInformationList.add(exceptionInformation);
+ sourceExceptionInformationList.add(exceptionInformation);
}
+ sourceInstanceStatusData.setLatestSourceExceptions(sourceExceptionInformationList);
- sourceInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+ // Source treats all system and sink exceptions as system exceptions
+ sourceInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions()
+ + status.getNumUserExceptions() + status.getNumSinkExceptions());
List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+ exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+ exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ systemExceptionInformationList.add(exceptionInformation);
+ }
+
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
ExceptionInformation exceptionInformation
= new ExceptionInformation();
@@ -79,6 +91,14 @@ public class SourceImpl extends ComponentImpl {
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
systemExceptionInformationList.add(exceptionInformation);
}
+
+ for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+ exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+ exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ systemExceptionInformationList.add(exceptionInformation);
+ }
sourceInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
sourceInstanceStatusData.setNumWritten(status.getNumSuccessfullyProcessed());