You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/12/23 22:10:41 UTC

[pulsar] branch master updated: Fix: Empty exception message in java functions can cause an NPE (#3245)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f24a0b1  Fix: Empty exception message in java functions can cause an NPE (#3245)
f24a0b1 is described below

commit f24a0b191f178369b253642d89f28e5115008142
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Dec 23 14:10:36 2018 -0800

    Fix: Empty exception message in java functions can cause an NPE (#3245)
    
    * Fix: Empty exception message in java functions can cause an NPE
    
    * add additional exception message error null checking
---
 .../functions/instance/FunctionStatsManager.java   | 34 ++++++++++++----------
 1 file changed, 18 insertions(+), 16 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index 72b792e..818fdb3 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -320,15 +320,13 @@ public class FunctionStatsManager implements AutoCloseable {
 
     public void addUserException(Exception ex) {
         long ts = System.currentTimeMillis();
-        InstanceCommunication.FunctionStatus.ExceptionInformation info =
-                    InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
-                    .setExceptionString(ex.getMessage()).setMsSinceEpoch(ts).build();
+        InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts);
         latestUserExceptions.add(info);
 
         // report exception throw prometheus
         if (userExceptionRateLimiter.tryAcquire()) {
             String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage();
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
             exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
             userExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
@@ -336,15 +334,13 @@ public class FunctionStatsManager implements AutoCloseable {
 
     public void addSystemException(Throwable ex) {
         long ts = System.currentTimeMillis();
-        InstanceCommunication.FunctionStatus.ExceptionInformation info =
-                InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
-                        .setExceptionString(ex.getMessage()).setMsSinceEpoch(ts).build();
+        InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts);
         latestSystemExceptions.add(info);
 
         // report exception throw prometheus
         if (sysExceptionRateLimiter.tryAcquire()) {
             String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage();
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
             exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
             sysExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
@@ -352,15 +348,13 @@ public class FunctionStatsManager implements AutoCloseable {
 
     public void addSourceException(Throwable ex) {
         long ts = System.currentTimeMillis();
-        InstanceCommunication.FunctionStatus.ExceptionInformation info =
-                InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
-                        .setExceptionString(ex.getMessage()).setMsSinceEpoch(ts).build();
+        InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts);
         latestSourceExceptions.add(info);
 
         // report exception throw prometheus
         if (sourceExceptionRateLimiter.tryAcquire()) {
             String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage();
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
             exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
             sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
@@ -368,20 +362,28 @@ public class FunctionStatsManager implements AutoCloseable {
 
     public void addSinkException(Throwable ex) {
         long ts = System.currentTimeMillis();
-        InstanceCommunication.FunctionStatus.ExceptionInformation info =
-                InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
-                        .setExceptionString(ex.getMessage()).setMsSinceEpoch(ts).build();
+        InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts);
         latestSinkExceptions.add(info);
 
         // report exception throw prometheus
         if (sinkExceptionRateLimiter.tryAcquire()) {
             String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage();
+            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
             exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
             sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
 
+    private InstanceCommunication.FunctionStatus.ExceptionInformation getExceptionInfo(Throwable th, long ts) {
+        InstanceCommunication.FunctionStatus.ExceptionInformation.Builder exceptionInfoBuilder =
+                InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder().setMsSinceEpoch(ts);
+        String msg = th.getMessage();
+        if (msg != null) {
+            exceptionInfoBuilder.setExceptionString(msg);
+        }
+        return exceptionInfoBuilder.build();
+    }
+
     public void incrTotalReceived() {
         _statTotalRecordsRecieved.inc();
         _statTotalRecordsRecieved1min.inc();