You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/12/10 19:48:49 UTC

[pulsar] branch master updated: Introduce Source and Sink exceptions in Status (#3142)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c7ff2b  Introduce Source and Sink exceptions in Status (#3142)
8c7ff2b is described below

commit 8c7ff2ba7abb57a0a9e502bfd051c24c02835749
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon Dec 10 11:48:44 2018 -0800

    Introduce Source and Sink exceptions in Status (#3142)
    
    * Introduce Source and Sink exceptions in Status
    
    * Added the counter at the right place
---
 .../pulsar/common/policies/data/SinkStatus.java    |   6 +
 .../pulsar/common/policies/data/SourceStatus.java  |   6 +
 .../functions/instance/FunctionStatsManager.java   | 127 +++++++++++++++++++++
 .../functions/instance/JavaInstanceRunnable.java   |   2 +
 .../src/main/proto/InstanceCommunication.proto     |   4 +
 .../functions/worker/rest/api/FunctionsImpl.java   |  18 ++-
 .../pulsar/functions/worker/rest/api/SinkImpl.java |  28 ++++-
 .../functions/worker/rest/api/SourceImpl.java      |  28 ++++-
 8 files changed, 210 insertions(+), 9 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
index 609f107..3b7a7a1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java
@@ -58,6 +58,12 @@ public class SinkStatus {
             // A list of the most recent system exceptions
             public List<ExceptionInformation> latestSystemExceptions;
 
+            // Number of times there was a sink exception
+            public long numSinkExceptions;
+
+            // A list of the most recent sink exceptions
+            public List<ExceptionInformation> latestSinkExceptions;
+
             // Number of messages written to sink
             public long numWrittenToSink;
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
index 4043900..3f103e0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java
@@ -58,6 +58,12 @@ public class SourceStatus {
             // A list of the most recent system exceptions
             public List<ExceptionInformation> latestSystemExceptions;
 
+            // Number of times there was a exception from source while reading messages
+            public long numSourceExceptions;
+
+            // A list of the most recent source exceptions
+            public List<ExceptionInformation> latestSourceExceptions;
+
             // Number of messages written into pulsar
             public long numWritten;
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index 9059f79..96941f5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -60,6 +60,8 @@ public class FunctionStatsManager implements AutoCloseable {
     public static final String PROCESSED_SUCCESSFULLY_TOTAL = "processed_successfully_total";
     public static final String SYSTEM_EXCEPTIONS_TOTAL = "system_exceptions_total";
     public static final String USER_EXCEPTIONS_TOTAL = "user_exceptions_total";
+    public static final String SOURCE_EXCEPTIONS_TOTAL = "source_exceptions_total";
+    public static final String SINK_EXCEPTIONS_TOTAL = "sink_exceptions_total";
     public static final String PROCESS_LATENCY_MS = "process_latency_ms";
     public static final String LAST_INVOCATION = "last_invocation";
     public static final String RECEIVED_TOTAL = "received_total";
@@ -67,6 +69,8 @@ public class FunctionStatsManager implements AutoCloseable {
     public static final String PROCESSED_SUCCESSFULLY_TOTAL_1min = "processed_successfully_total_1min";
     public static final String SYSTEM_EXCEPTIONS_TOTAL_1min = "system_exceptions_total_1min";
     public static final String USER_EXCEPTIONS_TOTAL_1min = "user_exceptions_total_1min";
+    public static final String SOURCE_EXCEPTIONS_TOTAL_1min = "source_exceptions_total_1min";
+    public static final String SINK_EXCEPTIONS_TOTAL_1min = "sink_exceptions_total_1min";
     public static final String PROCESS_LATENCY_MS_1min = "process_latency_ms_1min";
     public static final String RECEIVED_TOTAL_1min = "received_total_1min";
 
@@ -78,6 +82,10 @@ public class FunctionStatsManager implements AutoCloseable {
 
     final Counter statTotalUserExceptions;
 
+    final Counter statTotalSourceExceptions;
+
+    final Counter statTotalSinkExceptions;
+
     final Summary statProcessLatency;
 
     final Gauge statlastInvocation;
@@ -92,6 +100,10 @@ public class FunctionStatsManager implements AutoCloseable {
 
     final Counter statTotalUserExceptions1min;
 
+    final Counter statTotalSourceExceptions1min;
+
+    final Counter statTotalSinkExceptions1min;
+
     final Summary statProcessLatency1min;
 
     final Counter statTotalRecordsRecieved1min;
@@ -102,6 +114,10 @@ public class FunctionStatsManager implements AutoCloseable {
 
     final Gauge sysExceptions;
 
+    final Gauge sourceExceptions;
+
+    final Gauge sinkExceptions;
+
     private String[] metricsLabels;
 
     private ScheduledFuture<?> scheduledFuture;
@@ -112,11 +128,19 @@ public class FunctionStatsManager implements AutoCloseable {
     private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions = EvictingQueue.create(10);
     @Getter
     private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSystemExceptions = EvictingQueue.create(10);
+    @Getter
+    private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSourceExceptions = EvictingQueue.create(10);
+    @Getter
+    private EvictingQueue<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSinkExceptions = EvictingQueue.create(10);
 
     private final RateLimiter userExceptionRateLimiter;
 
     private final RateLimiter sysExceptionRateLimiter;
 
+    private final RateLimiter sourceExceptionRateLimiter;
+
+    private final RateLimiter sinkExceptionRateLimiter;
+
     public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] metricsLabels, ScheduledExecutorService scheduledExecutorService) {
 
         this.collectorRegistry = collectorRegistry;
@@ -141,6 +165,18 @@ public class FunctionStatsManager implements AutoCloseable {
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
 
+        statTotalSourceExceptions = Counter.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL)
+                .help("Total number of source exceptions.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+
+        statTotalSinkExceptions = Counter.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL)
+                .help("Total number of sink exceptions.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+
         statProcessLatency = Summary.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS)
                 .help("Process latency in milliseconds.")
@@ -181,6 +217,18 @@ public class FunctionStatsManager implements AutoCloseable {
                 .labelNames(metricsLabelNames)
                 .register(collectorRegistry);
 
+        statTotalSourceExceptions1min = Counter.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + SOURCE_EXCEPTIONS_TOTAL_1min)
+                .help("Total number of source exceptions in the last 1 minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+
+        statTotalSinkExceptions1min = Counter.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + SINK_EXCEPTIONS_TOTAL_1min)
+                .help("Total number of sink exceptions in the last 1 minute.")
+                .labelNames(metricsLabelNames)
+                .register(collectorRegistry);
+
         statProcessLatency1min = Summary.build()
                 .name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
                 .help("Process latency in milliseconds in the last 1 minute.")
@@ -209,6 +257,18 @@ public class FunctionStatsManager implements AutoCloseable {
                 .help("Exception from system code.")
                 .register(collectorRegistry);
 
+        sourceExceptions = Gauge.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + "source_exception")
+                .labelNames(exceptionMetricsLabelNames)
+                .help("Exception from source.")
+                .register(collectorRegistry);
+
+        sinkExceptions = Gauge.build()
+                .name(PULSAR_FUNCTION_METRICS_PREFIX + "sink_exception")
+                .labelNames(exceptionMetricsLabelNames)
+                .help("Exception from sink.")
+                .register(collectorRegistry);
+
         scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
@@ -222,6 +282,9 @@ public class FunctionStatsManager implements AutoCloseable {
 
         userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
         sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+        sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+        sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
+
     }
 
     public void addUserException(Exception ex) {
@@ -256,6 +319,38 @@ public class FunctionStatsManager implements AutoCloseable {
         }
     }
 
+    public void addSourceException(Throwable ex) {
+        long ts = System.currentTimeMillis();
+        InstanceCommunication.FunctionStatus.ExceptionInformation info =
+                InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+                        .setExceptionString(ex.getMessage()).setMsSinceEpoch(ts).build();
+        latestSourceExceptions.add(info);
+
+        // report exception throw prometheus
+        if (sourceExceptionRateLimiter.tryAcquire()) {
+            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage();
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+            sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
+        }
+    }
+
+    public void addSinkException(Throwable ex) {
+        long ts = System.currentTimeMillis();
+        InstanceCommunication.FunctionStatus.ExceptionInformation info =
+                InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+                        .setExceptionString(ex.getMessage()).setMsSinceEpoch(ts).build();
+        latestSinkExceptions.add(info);
+
+        // report exception throw prometheus
+        if (sinkExceptionRateLimiter.tryAcquire()) {
+            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage();
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+            sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
+        }
+    }
+
     public void incrTotalReceived() {
         statTotalRecordsRecieved.labels(metricsLabels).inc();
         statTotalRecordsRecieved1min.labels(metricsLabels).inc();
@@ -278,6 +373,18 @@ public class FunctionStatsManager implements AutoCloseable {
         addUserException(userException);
     }
 
+    public void incrSourceExceptions(Exception userException) {
+        statTotalSourceExceptions.labels(metricsLabels).inc();
+        statTotalSourceExceptions1min.labels(metricsLabels).inc();
+        addSourceException(userException);
+    }
+
+    public void incrSinkExceptions(Exception userException) {
+        statTotalSinkExceptions.labels(metricsLabels).inc();
+        statTotalSinkExceptions1min.labels(metricsLabels).inc();
+        addSinkException(userException);
+    }
+
     public void setLastInvocation(long ts) {
         statlastInvocation.labels(metricsLabels).set(ts);
     }
@@ -311,6 +418,14 @@ public class FunctionStatsManager implements AutoCloseable {
         return statTotalUserExceptions.labels(metricsLabels).get();
     }
 
+    public double getTotalSourceExceptions() {
+        return statTotalSourceExceptions.labels(metricsLabels).get();
+    }
+
+    public double getTotalSinkExceptions() {
+        return statTotalSinkExceptions.labels(metricsLabels).get();
+    }
+
     public double getLastInvocation() {
         return statlastInvocation.labels(metricsLabels).get();
     }
@@ -352,6 +467,14 @@ public class FunctionStatsManager implements AutoCloseable {
         return statTotalUserExceptions1min.labels(metricsLabels).get();
     }
 
+    public double getTotalSourceExceptions1min() {
+        return statTotalSourceExceptions1min.labels(metricsLabels).get();
+    }
+
+    public double getTotalSinkExceptions1min() {
+        return statTotalSinkExceptions1min.labels(metricsLabels).get();
+    }
+
     public double getAvgProcessLatency1min() {
         return statProcessLatency1min.labels(metricsLabels).get().count <= 0.0
                 ? 0 : statProcessLatency1min.labels(metricsLabels).get().sum / statProcessLatency1min.labels(metricsLabels).get().count;
@@ -377,10 +500,14 @@ public class FunctionStatsManager implements AutoCloseable {
         statTotalProcessedSuccessfully1min.clear();
         statTotalSysExceptions1min.clear();
         statTotalUserExceptions1min.clear();
+        statTotalSourceExceptions1min.clear();
+        statTotalSinkExceptions1min.clear();
         statProcessLatency1min.clear();
         statTotalRecordsRecieved1min.clear();
         latestUserExceptions.clear();
         latestSystemExceptions.clear();
+        latestSourceExceptions.clear();
+        latestSinkExceptions.clear();
     }
 
     public String getStatsAsString() throws IOException {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e55fabc..9e727a5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -419,6 +419,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             this.sink.write(new SinkRecord<>(srcRecord, output));
         } catch (Exception e) {
             log.info("Encountered exception in sink write: ", e);
+            stats.incrSinkExceptions(e);
             throw new RuntimeException(e);
         }
     }
@@ -428,6 +429,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         try {
             record = this.source.read();
         } catch (Exception e) {
+            stats.incrSourceExceptions(e);
             log.info("Encountered exception in source read: ", e);
             throw new RuntimeException(e);
         }
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 13bb206..1b50ea8 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -39,6 +39,10 @@ message FunctionStatus {
     repeated ExceptionInformation latestUserExceptions = 7;
     int64 numSystemExceptions = 8;
     repeated ExceptionInformation latestSystemExceptions = 9;
+    int64 numSourceExceptions = 18;
+    repeated ExceptionInformation latestSourceExceptions = 19;
+    int64 numSinkExceptions = 20;
+    repeated ExceptionInformation latestSinkExceptions = 21;
     // map from topic name to number of deserialization exceptions
 //    map<string, int64> deserializationExceptions = 10;
     // number of serialization exceptions on the output
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 355cf11..9a6f739 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -74,7 +74,9 @@ public class FunctionsImpl extends ComponentImpl {
             }
             functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
 
-            functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+            // For regular functions source/sink errors are system exceptions
+            functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions()
+                    + status.getNumSourceExceptions() + status.getNumSinkExceptions());
             List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
                 ExceptionInformation exceptionInformation
@@ -83,6 +85,20 @@ public class FunctionsImpl extends ComponentImpl {
                 exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
                 systemExceptionInformationList.add(exceptionInformation);
             }
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                systemExceptionInformationList.add(exceptionInformation);
+            }
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                systemExceptionInformationList.add(exceptionInformation);
+            }
             functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
 
             functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 45bbba4..6d70fbb 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -62,17 +62,18 @@ public class SinkImpl extends ComponentImpl {
             sinkInstanceStatusData.setNumRestarts(status.getNumRestarts());
             sinkInstanceStatusData.setNumReadFromPulsar(status.getNumReceived());
 
-            List<ExceptionInformation> userExceptionInformationList = new LinkedList<>();
+            // We treat source/user/system exceptions returned from function as system exceptions
+            sinkInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions()
+                    + status.getNumUserExceptions() + status.getNumSourceExceptions());
+            List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
                 ExceptionInformation exceptionInformation
                         = new ExceptionInformation();
                 exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                 exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
-                userExceptionInformationList.add(exceptionInformation);
+                systemExceptionInformationList.add(exceptionInformation);
             }
 
-            sinkInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
-            List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
                 ExceptionInformation exceptionInformation
                         = new ExceptionInformation();
@@ -80,8 +81,27 @@ public class SinkImpl extends ComponentImpl {
                 exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
                 systemExceptionInformationList.add(exceptionInformation);
             }
+
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                systemExceptionInformationList.add(exceptionInformation);
+            }
             sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
 
+            sinkInstanceStatusData.setNumSinkExceptions(status.getNumSinkExceptions());
+            List<ExceptionInformation> sinkExceptionInformationList = new LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                sinkExceptionInformationList.add(exceptionInformation);
+            }
+            sinkInstanceStatusData.setLatestSinkExceptions(sinkExceptionInformationList);
+
             sinkInstanceStatusData.setNumWrittenToSink(status.getNumSuccessfullyProcessed());
             sinkInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime());
             sinkInstanceStatusData.setWorkerId(assignedWorkerId);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index f05eea6..8c3c988 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -61,17 +61,29 @@ public class SourceImpl extends ComponentImpl {
             sourceInstanceStatusData.setNumRestarts(status.getNumRestarts());
             sourceInstanceStatusData.setNumReceivedFromSource(status.getNumReceived());
 
-            List<ExceptionInformation> userExceptionInformationList = new LinkedList<>();
-            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
+            sourceInstanceStatusData.setNumSourceExceptions(status.getNumSourceExceptions());
+            List<ExceptionInformation> sourceExceptionInformationList = new LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
                 ExceptionInformation exceptionInformation
                         = new ExceptionInformation();
                 exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                 exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
-                userExceptionInformationList.add(exceptionInformation);
+                sourceExceptionInformationList.add(exceptionInformation);
             }
+            sourceInstanceStatusData.setLatestSourceExceptions(sourceExceptionInformationList);
 
-            sourceInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
+            // Source treats all system and sink exceptions as system exceptions
+            sourceInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions()
+                    + status.getNumUserExceptions() + status.getNumSinkExceptions());
             List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                systemExceptionInformationList.add(exceptionInformation);
+            }
+
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
                 ExceptionInformation exceptionInformation
                         = new ExceptionInformation();
@@ -79,6 +91,14 @@ public class SourceImpl extends ComponentImpl {
                 exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
                 systemExceptionInformationList.add(exceptionInformation);
             }
+
+            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
+                ExceptionInformation exceptionInformation
+                        = new ExceptionInformation();
+                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                systemExceptionInformationList.add(exceptionInformation);
+            }
             sourceInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
 
             sourceInstanceStatusData.setNumWritten(status.getNumSuccessfullyProcessed());