You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/14 17:55:04 UTC
[pulsar] Diff for: [GitHub] srkukarni merged pull request #3363: Enable
stats to be recovered by Kubernetes runtime
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 3a0a404e69..43467ae2b9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -317,7 +317,7 @@ public void getMetrics(com.google.protobuf.Empty request,
Runtime runtime = runtimeSpawner.getRuntime();
if (runtime != null) {
try {
- InstanceCommunication.MetricsData metrics = runtime.getMetrics().get();
+ InstanceCommunication.MetricsData metrics = runtime.getMetrics(instanceId).get();
responseObserver.onNext(metrics);
responseObserver.onCompleted();
} catch (InterruptedException | ExecutionException e) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index a3a006a427..aba473b18c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -295,9 +295,31 @@ public void onSuccess(FunctionStatus t) {
}
@Override
- public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+ public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
- retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support getting metrics via rest"));
+ if (instanceId < 0 || instanceId >= stub.length) {
+ if (stub == null) {
+ retval.completeExceptionally(new RuntimeException("Invalid InstanceId"));
+ return retval;
+ }
+ }
+ if (stub == null) {
+ retval.completeExceptionally(new RuntimeException("Not alive"));
+ return retval;
+ }
+ ListenableFuture<InstanceCommunication.MetricsData> response = stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build());
+ Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ InstanceCommunication.MetricsData.Builder builder = InstanceCommunication.MetricsData.newBuilder();
+ retval.complete(builder.build());
+ }
+
+ @Override
+ public void onSuccess(InstanceCommunication.MetricsData t) {
+ retval.complete(t);
+ }
+ });
return retval;
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index d3046ba99b..d0877ebdcf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -267,7 +267,7 @@ public void onSuccess(Empty t) {
}
@Override
- public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+ public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
if (stub == null) {
retval.completeExceptionally(new RuntimeException("Not alive"));
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index fafdca7123..19a5fc9eee 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -45,7 +45,7 @@
CompletableFuture<Void> resetMetrics();
- CompletableFuture<InstanceCommunication.MetricsData> getMetrics();
+ CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId);
String getPrometheusMetrics() throws IOException;
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index be049c38f9..46da24bdc3 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -153,7 +153,7 @@ public void stop() {
@Override
- public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+ public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
return CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics());
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 152c1db622..30160b0d0b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -482,7 +482,7 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
if (runtimeSpawner != null) {
- return Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo).getMetrics();
+ return Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo, instanceId).getMetrics();
}
return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
} else {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 42405068b1..3c1fa4c8a1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -185,7 +185,9 @@ public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, Strin
}
}
- public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName, FunctionRuntimeInfo functionRuntimeInfo) {
+ public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName,
+ FunctionRuntimeInfo functionRuntimeInfo,
+ int instanceId) {
RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
@@ -194,8 +196,7 @@ public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, Strin
if (functionRuntime != null) {
try {
- InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics().get();
- int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
+ InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics(instanceId).get();
functionInstanceStats.setInstanceId(instanceId);
FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData
@@ -229,14 +230,4 @@ public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, Strin
}
return functionInstanceStats;
}
-
- public static FunctionStats getFunctionStats(Map<String, FunctionRuntimeInfo> functionRuntimes) {
- FunctionStats functionStats = new FunctionStats();
- for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) {
- String fullyQualifiedInstanceName = entry.getKey();
- FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
- functionStats.addInstance(Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo));
- }
- return functionStats;
- }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 6351272027..79f4df224e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -158,14 +158,28 @@ public boolean isSuperUser(final String clientRole) {
String fullyQualifiedInstanceName = entry.getKey();
FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
- FunctionStats.FunctionInstanceStats functionInstanceStats =
- Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo);
-
- WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
- workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
- workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
-
- metricsList.add(workerFunctionInstanceStats);
+ if (workerService.getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
+ Function.FunctionDetails functionDetails = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
+ int parallelism = functionDetails.getParallelism();
+ for (int i = 0; i < parallelism; ++i) {
+ FunctionStats.FunctionInstanceStats functionInstanceStats =
+ Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo, i);
+ WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
+ workerFunctionInstanceStats.setName(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(
+ functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), i
+ ));
+ workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
+ metricsList.add(workerFunctionInstanceStats);
+ }
+ } else {
+ FunctionStats.FunctionInstanceStats functionInstanceStats =
+ Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo,
+ functionRuntimeInfo.getFunctionInstance().getInstanceId());
+ WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
+ workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
+ workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
+ metricsList.add(workerFunctionInstanceStats);
+ }
}
return metricsList;
}
With regards,
Apache Git Services