You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/15 22:18:20 UTC
[incubator-pulsar] branch master updated: Functions metrics
prometheus (#1373)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6fbd8c3 Functions metrics prometheus (#1373)
6fbd8c3 is described below
commit 6fbd8c3c63c66cd1bfc08d04c2c553ffb78e0b55
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Mar 15 15:18:18 2018 -0700
Functions metrics prometheus (#1373)
* adding pulsar function stats to broker prometheus
* refactoring class name
* adding missing license header
* refactoring code and removing function's metrics sink
* fixing header
* adding instance liveness check
* adding null check
* adding unittest
---
conf/functions_worker.yml | 7 +-
.../stats/prometheus/NamespaceStatsAggregator.java | 2 +-
.../prometheus/PrometheusMetricsGenerator.java | 6 +-
.../stats/prometheus/PrometheusMetricsServlet.java | 1 -
.../pulsar/broker/stats/prometheus/TopicStats.java | 2 +-
.../org/apache/pulsar/utils/StatsOutputStream.java | 1 +
.../pulsar/utils/SimpleTextOutputStreamTest.java | 3 +-
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 3 +-
.../apache/pulsar/common/util}/NumberFormat.java | 2 +-
.../common/util}/SimpleTextOutputStream.java | 9 +-
.../pulsar/functions/runtime/JavaInstanceMain.java | 21 ++-
.../pulsar/functions/runtime/RuntimeSpawner.java | 50 ++---
pulsar-functions/worker/pom.xml | 6 +
.../pulsar/functions/worker/FunctionActioner.java | 11 +-
.../functions/worker/FunctionRuntimeManager.java | 24 +--
.../functions/worker/FunctionsStatsGenerator.java | 93 ++++++++++
.../pulsar/functions/worker/WorkerConfig.java | 28 +--
.../pulsar/functions/worker/WorkerService.java | 15 --
.../pulsar/functions/worker/rest/Resources.java | 21 ++-
.../pulsar/functions/worker/rest/WorkerServer.java | 27 ++-
.../worker/rest/api/FunctionsMetricsResource.java | 61 +++++++
.../worker/rest/api/v2/FunctionApiV2Resource.java | 1 -
.../worker/FunctionRuntimeManagerTest.java | 3 -
.../worker/FunctionStatsGeneratorTest.java | 201 +++++++++++++++++++++
.../functions/worker/MembershipManagerTest.java | 2 -
.../functions/worker/SchedulerManagerTest.java | 2 -
26 files changed, 455 insertions(+), 147 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index daaeae9..ff9d231 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -28,12 +28,6 @@ pulsarServiceUrl: pulsar://localhost:6650
pulsarWebServiceUrl: http://localhost:8080
numFunctionPackageReplicas: 1
downloadDirectory: /tmp/pulsar_functions
-metricsConfig:
- metricsSinkClassName: org.apache.pulsar.functions.metrics.sink.PrometheusSink
- metricsCollectionInterval: 30
- metricsSinkConfig:
- path: /metrics
- port: 9099
#threadContainerFactory:
# threadGroupName: "Thread Function Container Group"
processContainerFactory:
@@ -45,3 +39,4 @@ failureCheckFreqMs: 30000
rescheduleTimeoutMs: 60000
initialBrokerReconnectMaxRetries: 60
assignmentWriteMaxRetries: 60
+instanceLivenessCheckFreqMs: 30000
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 18693da..22115ea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
-import org.apache.pulsar.utils.SimpleTextOutputStream;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
import io.netty.util.concurrent.FastThreadLocal;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index e7bcda1..167ec1c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -24,7 +24,7 @@ import java.util.Enumeration;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
-import org.apache.pulsar.utils.SimpleTextOutputStream;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -36,6 +36,7 @@ import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import io.prometheus.client.Gauge.Child;
import io.prometheus.client.hotspot.DefaultExports;
+import org.apache.pulsar.functions.worker.FunctionsStatsGenerator;
/**
* Generate metrics aggregated at the namespace level and optionally at a topic level and formats them out
@@ -71,6 +72,9 @@ public class PrometheusMetricsGenerator {
NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, stream);
+ FunctionsStatsGenerator.generate(pulsar.getWorkerService(),
+ pulsar.getConfiguration().getClusterName(), stream);
+
out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
} finally {
buf.release();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 0dc8749..b924177 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -60,7 +60,6 @@ public class PrometheusMetricsServlet extends HttpServlet {
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
AsyncContext context = request.startAsync();
-
executor.execute(safeRun(() -> {
HttpServletResponse res = (HttpServletResponse) context.getResponse();
try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index a750b75..f5938b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.stats.prometheus;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
-import org.apache.pulsar.utils.SimpleTextOutputStream;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
import java.util.HashMap;
import java.util.Map;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java
index 7bdc66b..90d5c3e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/StatsOutputStream.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.utils;
import java.util.Stack;
import io.netty.buffer.ByteBuf;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
public class StatsOutputStream extends SimpleTextOutputStream {
private final Stack<Boolean> separators = new Stack<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java
index 57a5b64..030ff30 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleTextOutputStreamTest.java
@@ -22,8 +22,7 @@ import static org.testng.Assert.assertEquals;
import java.nio.charset.Charset;
-import org.apache.pulsar.utils.SimpleTextOutputStream;
-import org.apache.pulsar.utils.StatsOutputStream;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index ae348ae..b38aed2 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -477,8 +477,7 @@ public class CmdFunctions extends CmdBase {
instanceConfig,
userCodeFile,
containerFactory,
- null,
- 0);
+ null);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/NumberFormat.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java
similarity index 98%
rename from pulsar-broker/src/main/java/org/apache/pulsar/utils/NumberFormat.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java
index 0db4adb..eaf3664 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/NumberFormat.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NumberFormat.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.utils;
+package org.apache.pulsar.common.util;
import io.netty.buffer.ByteBuf;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleTextOutputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
similarity index 94%
rename from pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleTextOutputStream.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
index 091c529..3f75fde 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleTextOutputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.utils;
+package org.apache.pulsar.common.util;
import io.netty.buffer.ByteBuf;
@@ -99,17 +99,17 @@ public class SimpleTextOutputStream {
return this;
}
- SimpleTextOutputStream write(boolean b) {
+ public SimpleTextOutputStream write(boolean b) {
write(b ? "true" : "false");
return this;
}
- SimpleTextOutputStream write(long n) {
+ public SimpleTextOutputStream write(long n) {
NumberFormat.format(this.buffer, n);
return this;
}
- SimpleTextOutputStream write(double d) {
+ public SimpleTextOutputStream write(double d) {
long i = (long) d;
write(i);
@@ -131,5 +131,4 @@ public class SimpleTextOutputStream {
write(r);
return this;
}
-
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 08bfbe4..594ea10 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import java.lang.reflect.Type;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
/**
* A function container implemented using java thread.
@@ -166,8 +167,7 @@ public class JavaInstanceMain {
instanceConfig,
jarFile,
containerFactory,
- null,
- 0);
+ null);
server = ServerBuilder.forPort(port)
.addService(new InstanceControlImpl(runtimeSpawner))
@@ -221,5 +221,22 @@ public class JavaInstanceMain {
throw new RuntimeException(e);
}
}
+
+ @Override
+ public void getAndResetMetrics(com.google.protobuf.Empty request,
+ io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) {
+ Runtime runtime = runtimeSpawner.getRuntime();
+ if (runtime != null) {
+ try {
+ InstanceCommunication.MetricsData metrics = runtime.getAndResetMetrics().get();
+ responseObserver.onNext(metrics);
+ responseObserver.onCompleted();
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Exception in JavaInstance doing getAndResetMetrics", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
}
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index d6c203a..6bcb988 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -31,10 +31,6 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.runtime.RuntimeFactory;
-import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.metrics.MetricsSink;
-import org.apache.pulsar.functions.utils.FunctionConfigUtils;
@Slf4j
public class RuntimeSpawner implements AutoCloseable {
@@ -45,22 +41,19 @@ public class RuntimeSpawner implements AutoCloseable {
@Getter
private Runtime runtime;
- private MetricsSink metricsSink;
- private int metricsCollectionInterval;
- private Timer metricsCollectionTimer;
+ private Timer processLivenessCheckTimer;
private int numRestarts;
+ private Long instanceLivenessCheckFreqMs;
+
public RuntimeSpawner(InstanceConfig instanceConfig,
String codeFile,
- RuntimeFactory containerFactory,
- MetricsSink metricsSink,
- int metricsCollectionInterval) {
+ RuntimeFactory containerFactory, Long instanceLivenessCheckFreqMs) {
this.instanceConfig = instanceConfig;
this.runtimeFactory = containerFactory;
this.codeFile = codeFile;
- this.metricsSink = metricsSink;
- this.metricsCollectionInterval = metricsCollectionInterval;
this.numRestarts = 0;
+ this.instanceLivenessCheckFreqMs = instanceLivenessCheckFreqMs;
}
public void start() throws Exception {
@@ -68,34 +61,21 @@ public class RuntimeSpawner implements AutoCloseable {
this.instanceConfig.getInstanceId());
runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile);
runtime.start();
- if (metricsSink != null) {
- log.info("Scheduling Metrics Collection every {} secs for {} - {}",
- metricsCollectionInterval,
- FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()),
- instanceConfig.getInstanceId());
- metricsCollectionTimer = new Timer();
- metricsCollectionTimer.scheduleAtFixedRate(new TimerTask() {
+
+ // monitor function runtime to make sure it is running. If not, restart the function runtime
+ if (instanceLivenessCheckFreqMs != null) {
+ processLivenessCheckTimer = new Timer();
+ processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
- if (runtime.isAlive()) {
-
- log.info("Collecting metrics for function {} - {}",
- FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()),
- instanceConfig.getInstanceId());
- runtime.getAndResetMetrics().thenAccept(t -> {
- if (t != null) {
- log.debug("Collected metrics {}", t);
- metricsSink.processRecord(t, instanceConfig.getFunctionConfig());
- }
- });
- } else {
+ if (!runtime.isAlive()) {
log.error("Function Container is dead with exception", runtime.getDeathException());
log.error("Restarting...");
runtime.start();
numRestarts++;
}
}
- }, metricsCollectionInterval * 1000, metricsCollectionInterval * 1000);
+ }, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs);
}
}
@@ -122,9 +102,9 @@ public class RuntimeSpawner implements AutoCloseable {
runtime.stop();
runtime = null;
}
- if (metricsCollectionTimer != null) {
- metricsCollectionTimer.cancel();
- metricsCollectionTimer = null;
+ if (processLivenessCheckTimer != null) {
+ processLivenessCheckTimer.cancel();
+ processLivenessCheckTimer = null;
}
}
}
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 3dab594..a4e9f03 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -119,6 +119,12 @@
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
</dependency>
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 296e7d0..1eb3d28 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -32,7 +32,6 @@ import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.metrics.MetricsSink;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
@@ -52,8 +51,6 @@ public class FunctionActioner implements AutoCloseable {
private final WorkerConfig workerConfig;
private final RuntimeFactory runtimeFactory;
- private final MetricsSink metricsSink;
- private final int metricsCollectionInterval;
private final Namespace dlogNamespace;
private LinkedBlockingQueue<FunctionAction> actionQueue;
private volatile boolean running;
@@ -61,14 +58,10 @@ public class FunctionActioner implements AutoCloseable {
public FunctionActioner(WorkerConfig workerConfig,
RuntimeFactory runtimeFactory,
- MetricsSink metricsSink,
- int metricCollectionInterval,
Namespace dlogNamespace,
LinkedBlockingQueue<FunctionAction> actionQueue) {
this.workerConfig = workerConfig;
this.runtimeFactory = runtimeFactory;
- this.metricsSink = metricsSink;
- this.metricsCollectionInterval = metricCollectionInterval;
this.dlogNamespace = dlogNamespace;
this.actionQueue = actionQueue;
actioner = new Thread(() -> {
@@ -168,8 +161,8 @@ public class FunctionActioner implements AutoCloseable {
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setInstanceId(String.valueOf(instanceId));
instanceConfig.setMaxBufferedTuples(1024);
- RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(), runtimeFactory,
- metricsSink, metricsCollectionInterval);
+ RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(),
+ runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
runtimeSpawner.start();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 1d65078..a20c782 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -31,7 +31,6 @@ import org.apache.pulsar.functions.proto.Request.AssignmentsUpdate;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
-import org.apache.pulsar.functions.metrics.MetricsSink;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import javax.ws.rs.client.Client;
@@ -73,8 +72,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
private final FunctionAssignmentTailer functionAssignmentTailer;
- private MetricsSink metricsSink;
-
private FunctionActioner functionActioner;
private RuntimeFactory runtimeFactory;
@@ -110,12 +107,9 @@ public class FunctionRuntimeManager implements AutoCloseable{
throw new RuntimeException("Either Thread or Process Container Factory need to be set");
}
- this.metricsSink = createMetricsSink();
-
this.actionQueue = new LinkedBlockingQueue<>();
this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
- this.metricsSink, this.workerConfig.getMetricsConfig().getMetricsCollectionInterval(),
dlogNamespace, actionQueue);
this.membershipManager = membershipManager;
@@ -127,7 +121,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
public void start() {
log.info("/** Starting Function Runtime Manager **/");
log.info("Initialize metrics sink...");
- this.metricsSink.init(this.workerConfig.getMetricsConfig().getMetricsSinkConfig());
log.info("Starting function actioner...");
this.functionActioner.start();
log.info("Starting function assignment tailer...");
@@ -428,6 +421,9 @@ public class FunctionRuntimeManager implements AutoCloseable{
}
}
+ public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
+ return this.functionRuntimeInfoMap;
+ }
/**
* Private methods for internal use. Should not be used outside of this class
*/
@@ -517,20 +513,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
this.functionAssignmentTailer.close();
}
- private MetricsSink createMetricsSink() {
- String className = workerConfig.getMetricsConfig().getMetricsSinkClassName();
- try {
- MetricsSink sink = (MetricsSink) Class.forName(className).newInstance();
- return sink;
- } catch (InstantiationException e) {
- throw new RuntimeException(e + " IMetricsSink class must have a no-arg constructor.");
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e + " IMetricsSink class must be concrete.");
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e + " IMetricsSink class must be a class path.");
- }
- }
-
private Map<String, Assignment> diff(Map<String, Assignment> assignmentMap1, Map<String, Assignment> assignmentMap2) {
Map<String, Assignment> result = new HashMap<>();
for (Map.Entry<String, Assignment> entry : assignmentMap1.entrySet()) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
new file mode 100644
index 0000000..c9fd539
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A class to generate stats for pulsar functions running on this broker
+ */
+public class FunctionsStatsGenerator {
+
+ private static final Logger log = LoggerFactory.getLogger(FunctionsStatsGenerator.class);
+
+ public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) {
+ if (workerService != null) {
+ Map<String, FunctionRuntimeInfo> functionRuntimes
+ = workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos();
+
+ for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) {
+ String fullyQualifiedInstanceName = entry.getKey();
+ FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
+ RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
+
+ if (functionRuntimeSpawner != null) {
+ Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
+ if (functionRuntime != null) {
+ try {
+ InstanceCommunication.MetricsData metrics = functionRuntime.getAndResetMetrics().get();
+ for (Map.Entry<String, InstanceCommunication.MetricsData.DataDigest> metricsEntry
+ : metrics.getMetricsMap().entrySet()) {
+ String metricName = metricsEntry.getKey();
+ InstanceCommunication.MetricsData.DataDigest dataDigest = metricsEntry.getValue();
+
+ String tenant = functionRuntimeInfo.getFunctionInstance()
+ .getFunctionMetaData().getFunctionConfig().getTenant();
+ String namespace = functionRuntimeInfo.getFunctionInstance()
+ .getFunctionMetaData().getFunctionConfig().getNamespace();
+ String name = functionRuntimeInfo.getFunctionInstance()
+ .getFunctionMetaData().getFunctionConfig().getName();
+ int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
+ String qualifiedNamespace = String.format("%s/%s", tenant, namespace);
+
+ metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%scount", metricName),
+ instanceId, dataDigest.getCount());
+ metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%smax", metricName),
+ instanceId, dataDigest.getMax());
+ metric(out, cluster, qualifiedNamespace,name, String.format("pulsar_function%smin", metricName),
+ instanceId, dataDigest.getMin());
+ metric(out, cluster, qualifiedNamespace, name, String.format("pulsar_function%ssum", metricName),
+ instanceId, dataDigest.getSum());
+
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Failed to collect metrics for function instance {}",
+ fullyQualifiedInstanceName, e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private static void metric(SimpleTextOutputStream stream, String cluster, String namespace,
+ String functionName, String metricName, int instanceId, double value) {
+ stream.write(metricName).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
+ .write("\", name=\"").write(functionName).write("\", instanceId=\"").write(instanceId).write("\"} ");
+ stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 579572a..fb875b5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -49,7 +49,6 @@ public class WorkerConfig implements Serializable {
private String pulsarFunctionsNamespace;
private int numFunctionPackageReplicas;
private String downloadDirectory;
- private MetricsConfig metricsConfig;
private long snapshotFreqMs;
private String stateStorageServiceUrl;
private String functionAssignmentTopicName;
@@ -58,6 +57,7 @@ public class WorkerConfig implements Serializable {
private long rescheduleTimeoutMs;
private int initialBrokerReconnectMaxRetries;
private int assignmentWriteMaxRetries;
+ private long instanceLivenessCheckFreqMs;
@Data
@Setter
@@ -97,30 +97,4 @@ public class WorkerConfig implements Serializable {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), WorkerConfig.class);
}
-
- @Data
- @Setter
- @Getter
- @EqualsAndHashCode
- @ToString
- @AllArgsConstructor
- @NoArgsConstructor
- @Accessors(chain = true)
- /**
- * This represents the config related to the resource limits of function calls
- */
- public static class MetricsConfig implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private String metricsSinkClassName;
- private int metricsCollectionInterval;
- private Map<String, String> metricsSinkConfig;
-
- public static MetricsConfig load(String yamlFile) throws IOException {
- ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- return mapper.readValue(new File(yamlFile), MetricsConfig.class);
- }
-
- }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 732ab07..20ffd8d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -58,21 +58,6 @@ public class WorkerService {
this.workerConfig = workerConfig;
}
- public static ServletContextHandler newServletContextHandler(String contextPath, WorkerService workerService) {
- final ResourceConfig config = new ResourceConfig(Resources.get());
- final ServletContextHandler contextHandler =
- new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
-
- contextHandler.setAttribute(FunctionApiResource.ATTRIBUTE_FUNCTION_WORKER, workerService);
- contextHandler.setContextPath(contextPath);
-
- final ServletHolder apiServlet =
- new ServletHolder(new ServletContainer(config));
- contextHandler.addServlet(apiServlet, "/*");
-
- return contextHandler;
- }
-
public void start() throws InterruptedException {
try {
start(FunctionMetadataSetup.setupFunctionMetadata(workerConfig));
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index 702ab4c..ff43e25 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker.rest;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
@@ -31,15 +32,19 @@ public final class Resources {
private Resources() {
}
- public static Set<Class<?>> get() {
- return new HashSet<>(getClasses());
+ public static Set<Class<?>> getApiResources() {
+ return new HashSet<>(
+ Arrays.asList(
+ FunctionApiV2Resource.class,
+ MultiPartFeature.class
+ ));
}
- private static List<Class<?>> getClasses() {
- return Arrays.asList(
- ConfigurationResource.class,
- FunctionApiV2Resource.class,
- MultiPartFeature.class
- );
+ public static Set<Class<?>> getRootResources() {
+ return new HashSet<>(
+ Arrays.asList(
+ ConfigurationResource.class,
+ FunctionsMetricsResource.class
+ ));
}
}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 0e897da..524a6ad 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -31,6 +31,10 @@ import java.net.URI;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.servlet.ServletContainer;
@Slf4j
public class WorkerServer implements Runnable {
@@ -57,8 +61,13 @@ public class WorkerServer implements Runnable {
final Server server = new Server(this.workerConfig.getWorkerPort());
List<Handler> handlers = new ArrayList<>(2);
- handlers.add(WorkerService.newServletContextHandler("/admin", workerService));
- handlers.add(WorkerService.newServletContextHandler("/admin/v2", workerService));
+ handlers.add(newServletContextHandler("/admin",
+ new ResourceConfig(Resources.getApiResources()), workerService));
+ handlers.add(newServletContextHandler("/admin/v2",
+ new ResourceConfig(Resources.getApiResources()), workerService));
+ handlers.add(newServletContextHandler("/",
+ new ResourceConfig(Resources.getRootResources()), workerService));
+
ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
HandlerCollection handlerCollection = new HandlerCollection();
@@ -86,4 +95,18 @@ public class WorkerServer implements Runnable {
public String getThreadName() {
return "worker-server-thread-" + this.workerConfig.getWorkerId();
}
+
+ public static ServletContextHandler newServletContextHandler(String contextPath, ResourceConfig config, WorkerService workerService) {
+ final ServletContextHandler contextHandler =
+ new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
+
+ contextHandler.setAttribute(FunctionApiResource.ATTRIBUTE_FUNCTION_WORKER, workerService);
+ contextHandler.setContextPath(contextPath);
+
+ final ServletHolder apiServlet =
+ new ServletHolder(new ServletContainer(config));
+ contextHandler.addServlet(apiServlet, "/*");
+
+ return contextHandler;
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
new file mode 100644
index 0000000..0b87060
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsMetricsResource.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.pulsar.functions.worker.FunctionsStatsGenerator;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
+@Path("/")
+public class FunctionsMetricsResource extends FunctionApiResource {
+ @Path("metrics")
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response getMetrics() throws JsonProcessingException {
+
+ WorkerService workerService = get();
+
+ ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+ try {
+ SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
+ FunctionsStatsGenerator.generate(workerService,"default", stream);
+ byte[] payload = buf.array();
+ int arrayOffset = buf.arrayOffset();
+ int readableBytes = buf.readableBytes();
+ StreamingOutput streamOut = out -> {
+ out.write(payload, arrayOffset, readableBytes);
+ out.flush();
+ };
+ return Response.ok(streamOut).build();
+ } finally {
+ buf.release();
+ }
+ }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 25ecf0a..7effcd8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -128,5 +128,4 @@ public class FunctionApiV2Resource extends FunctionApiResource {
public Response getAssignments() {
return functions.getAssignments();
}
-
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 327b4f6..3bca55c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -79,7 +79,6 @@ public class FunctionRuntimeManagerTest {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
- workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(TestSink.class.getName()));
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
@@ -173,7 +172,6 @@ public class FunctionRuntimeManagerTest {
workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
- workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(TestSink.class.getName()));
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
@@ -271,7 +269,6 @@ public class FunctionRuntimeManagerTest {
workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
- workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(TestSink.class.getName()));
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
new file mode 100644
index 0000000..5bdb812
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import lombok.ToString;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+
+public class FunctionStatsGeneratorTest {
+
+ @Test
+ public void testFunctionsStatsGenerate() {
+ FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);
+ Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new HashMap<>();
+
+ WorkerService workerService = mock(WorkerService.class);
+ doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager();
+
+ CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<>();
+ InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder()
+ .putMetrics(
+ "__total_processed__",
+ InstanceCommunication.MetricsData.DataDigest.newBuilder()
+ .setCount(100.0).setMax(200.0).setSum(300.0).setMin(0.0).build())
+ .putMetrics("__avg_latency_ms__",
+ InstanceCommunication.MetricsData.DataDigest.newBuilder()
+ .setCount(10.0).setMax(20.0).setSum(30.0).setMin(0.0).build())
+ .build();
+
+ metricsDataCompletableFuture.complete(metricsData);
+ Runtime runtime = mock(Runtime.class);
+ doReturn(metricsDataCompletableFuture).when(runtime).getAndResetMetrics();
+
+ RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class);
+ doReturn(runtime).when(runtimeSpawner).getRuntime();
+
+ Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionConfig(
+ Function.FunctionConfig.newBuilder()
+ .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+ Function.Instance instance = Function.Instance.newBuilder()
+ .setFunctionMetaData(function1).setInstanceId(0).build();
+
+ FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
+ doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner();
+ doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
+
+ functionRuntimeInfoMap.put(Utils.getFullyQualifiedInstanceId(instance), functionRuntimeInfo);
+ doReturn(functionRuntimeInfoMap).when(functionRuntimeManager).getFunctionRuntimeInfos();
+
+ ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+ SimpleTextOutputStream statsOut = new SimpleTextOutputStream(buf);
+ FunctionsStatsGenerator.generate(workerService, "default", statsOut);
+
+ String str = buf.toString(Charset.defaultCharset());
+ buf.release();
+ Map<String, Metric> metrics = parseMetrics(str);
+
+ Assert.assertEquals(metrics.size(), 8);
+
+ Metric m = metrics.get("pulsar_function__total_processed__count");
+ assertEquals(m.tags.get("cluster"), "default");
+ assertEquals(m.tags.get("instanceId"), "0");
+ assertEquals(m.tags.get("name"), "func-1");
+ assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+ assertEquals(m.value, 100.0);
+
+ m = metrics.get("pulsar_function__total_processed__max");
+ assertEquals(m.tags.get("cluster"), "default");
+ assertEquals(m.tags.get("instanceId"), "0");
+ assertEquals(m.tags.get("name"), "func-1");
+ assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+ assertEquals(m.value, 200.0);
+
+ m = metrics.get("pulsar_function__total_processed__sum");
+ assertEquals(m.tags.get("cluster"), "default");
+ assertEquals(m.tags.get("instanceId"), "0");
+ assertEquals(m.tags.get("name"), "func-1");
+ assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+ assertEquals(m.value, 300.0);
+
+ m = metrics.get("pulsar_function__total_processed__min");
+ assertEquals(m.tags.get("cluster"), "default");
+ assertEquals(m.tags.get("instanceId"), "0");
+ assertEquals(m.tags.get("name"), "func-1");
+ assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+ assertEquals(m.value, 0.0);
+
+ m = metrics.get("pulsar_function__avg_latency_ms__count");
+ assertEquals(m.tags.get("cluster"), "default");
+ assertEquals(m.tags.get("instanceId"), "0");
+ assertEquals(m.tags.get("name"), "func-1");
+ assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+ assertEquals(m.value, 10.0);
+
+ m = metrics.get("pulsar_function__avg_latency_ms__max");
+ assertEquals(m.tags.get("cluster"), "default");
+ assertEquals(m.tags.get("instanceId"), "0");
+ assertEquals(m.tags.get("name"), "func-1");
+ assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+ assertEquals(m.value, 20.0);
+
+ m = metrics.get("pulsar_function__avg_latency_ms__sum");
+ assertEquals(m.tags.get("cluster"), "default");
+ assertEquals(m.tags.get("instanceId"), "0");
+ assertEquals(m.tags.get("name"), "func-1");
+ assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+ assertEquals(m.value, 30.0);
+
+ m = metrics.get("pulsar_function__avg_latency_ms__min");
+ assertEquals(m.tags.get("cluster"), "default");
+ assertEquals(m.tags.get("instanceId"), "0");
+ assertEquals(m.tags.get("name"), "func-1");
+ assertEquals(m.tags.get("namespace"), "test-tenant/test-namespace");
+ assertEquals(m.value, 0.0);
+ }
+
+ /**
+ * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
+ */
+ private static Map<String, Metric> parseMetrics(String metrics) {
+ Map<String, Metric> parsed = new HashMap<>();
+
+ // Example of lines are
+ // jvm_threads_current{cluster="standalone",} 203.0
+ // or
+ // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
+ // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
+ Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.]+)(\\s(\\d+))?$");
+ Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+
+ Arrays.asList(metrics.split("\n")).forEach(line -> {
+ if (line.isEmpty()) {
+ return;
+ }
+ Matcher matcher = pattern.matcher(line);
+
+ checkArgument(matcher.matches());
+ String name = matcher.group(1);
+
+ Metric m = new Metric();
+ m.value = Double.valueOf(matcher.group(3));
+
+ String tags = matcher.group(2);
+ Matcher tagsMatcher = tagsPattern.matcher(tags);
+ while (tagsMatcher.find()) {
+ String tag = tagsMatcher.group(1);
+ String value = tagsMatcher.group(2);
+ m.tags.put(tag, value);
+ }
+
+ parsed.put(name, m);
+ });
+
+ return parsed;
+ }
+
+ @ToString
+ static class Metric {
+ Map<String, String> tags = new TreeMap<>();
+ double value;
+ }
+
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 43b08a1..4fd72b9 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -60,7 +60,6 @@ public class MembershipManagerTest {
workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
- workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName()));
}
@Test
@@ -250,7 +249,6 @@ public class MembershipManagerTest {
workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
- workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig().setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName()));
workerConfig.setRescheduleTimeoutMs(30000);
SchedulerManager schedulerManager = mock(SchedulerManager.class);
PulsarClient pulsarClient = mock(PulsarClient.class);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index a5ada20..b115b17 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -69,8 +69,6 @@ public class SchedulerManagerTest {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
- workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig()
- .setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName()));
workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
workerConfig.setAssignmentWriteMaxRetries(0);
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.