You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/14 17:55:04 UTC

[pulsar] Diff for: [GitHub] srkukarni merged pull request #3363: Enable stats to be recovered by Kubernetes runtime

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 3a0a404e69..43467ae2b9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -317,7 +317,7 @@ public void getMetrics(com.google.protobuf.Empty request,
             Runtime runtime = runtimeSpawner.getRuntime();
             if (runtime != null) {
                 try {
-                    InstanceCommunication.MetricsData metrics = runtime.getMetrics().get();
+                    InstanceCommunication.MetricsData metrics = runtime.getMetrics(instanceId).get();
                     responseObserver.onNext(metrics);
                     responseObserver.onCompleted();
                 } catch (InterruptedException | ExecutionException e) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index a3a006a427..aba473b18c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -295,9 +295,31 @@ public void onSuccess(FunctionStatus t) {
     }
 
     @Override
-    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
         CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
-        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support getting metrics via rest"));
+        if (instanceId < 0 || instanceId >= stub.length) {
+            if (stub == null) {
+                retval.completeExceptionally(new RuntimeException("Invalid InstanceId"));
+                return retval;
+            }
+        }
+        if (stub == null) {
+            retval.completeExceptionally(new RuntimeException("Not alive"));
+            return retval;
+        }
+        ListenableFuture<InstanceCommunication.MetricsData> response = stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build());
+        Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() {
+            @Override
+            public void onFailure(Throwable throwable) {
+                InstanceCommunication.MetricsData.Builder builder = InstanceCommunication.MetricsData.newBuilder();
+                retval.complete(builder.build());
+            }
+
+            @Override
+            public void onSuccess(InstanceCommunication.MetricsData t) {
+                retval.complete(t);
+            }
+        });
         return retval;
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index d3046ba99b..d0877ebdcf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -267,7 +267,7 @@ public void onSuccess(Empty t) {
     }
 
     @Override
-    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
         CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
         if (stub == null) {
             retval.completeExceptionally(new RuntimeException("Not alive"));
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index fafdca7123..19a5fc9eee 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -45,7 +45,7 @@
     
     CompletableFuture<Void> resetMetrics();
     
-    CompletableFuture<InstanceCommunication.MetricsData> getMetrics();
+    CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId);
 
     String getPrometheusMetrics() throws IOException;
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index be049c38f9..46da24bdc3 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -153,7 +153,7 @@ public void stop() {
     
     
     @Override
-    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
         return CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics());
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 152c1db622..30160b0d0b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -482,7 +482,7 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro
                     org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
             RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
             if (runtimeSpawner != null) {
-                return Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo).getMetrics();
+                return Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo, instanceId).getMetrics();
             }
             return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
         } else {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 42405068b1..3c1fa4c8a1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -185,7 +185,9 @@ public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, Strin
         }
     }
 
-    public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName, FunctionRuntimeInfo functionRuntimeInfo) {
+    public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName,
+                                                                               FunctionRuntimeInfo functionRuntimeInfo,
+                                                                               int instanceId) {
         RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
 
         FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
@@ -194,8 +196,7 @@ public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, Strin
             if (functionRuntime != null) {
                 try {
 
-                    InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics().get();
-                    int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
+                    InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics(instanceId).get();
                     functionInstanceStats.setInstanceId(instanceId);
 
                     FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData
@@ -229,14 +230,4 @@ public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, Strin
         }
         return functionInstanceStats;
     }
-
-    public static FunctionStats getFunctionStats(Map<String, FunctionRuntimeInfo> functionRuntimes) {
-        FunctionStats functionStats = new FunctionStats();
-        for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) {
-            String fullyQualifiedInstanceName = entry.getKey();
-            FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
-            functionStats.addInstance(Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo));
-        }
-        return functionStats;
-    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 6351272027..79f4df224e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -158,14 +158,28 @@ public boolean isSuperUser(final String clientRole) {
             String fullyQualifiedInstanceName = entry.getKey();
             FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
 
-            FunctionStats.FunctionInstanceStats functionInstanceStats =
-                    Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo);
-
-            WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
-            workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
-            workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
-
-            metricsList.add(workerFunctionInstanceStats);
+            if (workerService.getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
+                Function.FunctionDetails functionDetails = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
+                int parallelism = functionDetails.getParallelism();
+                for (int i = 0; i < parallelism; ++i) {
+                    FunctionStats.FunctionInstanceStats functionInstanceStats =
+                            Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo, i);
+                    WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
+                    workerFunctionInstanceStats.setName(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(
+                            functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), i
+                    ));
+                    workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
+                    metricsList.add(workerFunctionInstanceStats);
+                }
+            } else {
+                FunctionStats.FunctionInstanceStats functionInstanceStats =
+                        Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo,
+                                functionRuntimeInfo.getFunctionInstance().getInstanceId());
+                WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
+                workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
+                workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
+                metricsList.add(workerFunctionInstanceStats);
+            }
         }
         return metricsList;
     }


With regards,
Apache Git Services