You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/05 06:19:11 UTC

[pulsar] branch master updated: [functions][stats] NPE in FunctionStatsGenerator when worker service is not ready (#2723)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b74e9d  [functions][stats] NPE in FunctionStatsGenerator when worker service is not ready (#2723)
9b74e9d is described below

commit 9b74e9dc734f3c24290b2c81d3f492928fec7484
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Thu Oct 4 23:18:58 2018 -0700

    [functions][stats] NPE in FunctionStatsGenerator when worker service is not ready (#2723)
    
    * [functions][stats] NPE in FunctionStatsGenerator when worker service is not ready
    
    *Motivation*
    
    NullPointerException was thrown when function worker is running as part of broker and metrics collection kicks in
    before worker service completes initialization
    
    *Changes*
    
    Only generate functions when worker service is ready
    
    * Fix FunctionSTatsGeneratorTest
---
 .../pulsar/functions/worker/FunctionsStatsGenerator.java  |  3 ++-
 .../functions/worker/FunctionStatsGeneratorTest.java      | 15 +++++++++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index be7c88b..3e219d6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -38,7 +38,8 @@ public class FunctionsStatsGenerator {
     private static final Logger log = LoggerFactory.getLogger(FunctionsStatsGenerator.class);
 
     public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) {
-        if (workerService != null) {
+        // only when worker service is initialized, we generate the stats. otherwise we will get bunch of NPE.
+        if (workerService != null && workerService.isInitialized()) {
             Map<String, FunctionRuntimeInfo> functionRuntimes
                     = workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos();
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index 9816822..849d05d 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
 import lombok.ToString;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.functions.proto.Function;
@@ -41,11 +42,24 @@ import java.util.regex.Pattern;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
 public class FunctionStatsGeneratorTest {
 
     @Test
+    public void testGenerateFunctionStatsWhenWorkerServiceIsNotInitialized() {
+        WorkerService workerService = mock(WorkerService.class);
+        when(workerService.isInitialized()).thenReturn(false);
+        FunctionsStatsGenerator.generate(
+            workerService, "test-cluster", new SimpleTextOutputStream(Unpooled.buffer()));
+        verify(workerService, times(1)).isInitialized();
+        verify(workerService, times(0)).getFunctionRuntimeManager();
+    }
+
+    @Test
     public void testFunctionsStatsGenerate() {
         FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);
         Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new HashMap<>();
@@ -53,6 +67,7 @@ public class FunctionStatsGeneratorTest {
         WorkerService workerService = mock(WorkerService.class);
         doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager();
         doReturn(new WorkerConfig()).when(workerService).getWorkerConfig();
+        when(workerService.isInitialized()).thenReturn(true);
 
         CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<>();
         InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder()