You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/05 07:51:30 UTC

[GitHub] sijie closed pull request #2724: [functions][stats] don't generate function stats at worker service if runtime is k8s

sijie closed pull request #2724:  [functions][stats] don't generate function stats at worker service if runtime is k8s
URL: https://github.com/apache/pulsar/pull/2724
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 3e219d6c40..80c1b775fc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker;
 
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.eclipse.jetty.util.ConcurrentHashSet;
@@ -40,6 +41,11 @@
     public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) {
         // only when worker service is initialized, we generate the stats. otherwise we will get bunch of NPE.
         if (workerService != null && workerService.isInitialized()) {
+            // kubernetes runtime factory doesn't support stats collection through worker service
+            if (workerService.getFunctionRuntimeManager().getRuntimeFactory() instanceof KubernetesRuntimeFactory) {
+                return;
+            }
+
             Map<String, FunctionRuntimeInfo> functionRuntimes
                     = workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos();
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index 849d05d504..68a13b4264 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -25,6 +25,7 @@
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.testng.Assert;
@@ -59,6 +60,20 @@ public void testGenerateFunctionStatsWhenWorkerServiceIsNotInitialized() {
         verify(workerService, times(0)).getFunctionRuntimeManager();
     }
 
+    @Test
+    public void testGenerateFunctionStatsOnK8SRuntimeFactory() {
+        WorkerService workerService = mock(WorkerService.class);
+        when(workerService.isInitialized()).thenReturn(true);
+        FunctionRuntimeManager frm = mock(FunctionRuntimeManager.class);
+        when(frm.getRuntimeFactory()).thenReturn(mock(KubernetesRuntimeFactory.class));
+        when(workerService.getFunctionRuntimeManager()).thenReturn(frm);
+        FunctionsStatsGenerator.generate(
+            workerService, "test-cluster", new SimpleTextOutputStream(Unpooled.buffer()));
+        verify(workerService, times(1)).isInitialized();
+        verify(workerService, times(1)).getFunctionRuntimeManager();
+        verify(frm, times(0)).getFunctionRuntimeInfos();
+    }
+
     @Test
     public void testFunctionsStatsGenerate() {
         FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services