You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/12 01:36:30 UTC
[pulsar] branch master updated: Metrics Collector Agent for
Functions running in Kubernetes (#2773)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 89ccde0 Metrics Collector Agent for Functions running in Kubernetes (#2773)
89ccde0 is described below
commit 89ccde01617366ae1c5a6950c883b7aacac195e9
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Oct 11 18:36:25 2018 -0700
Metrics Collector Agent for Functions running in Kubernetes (#2773)
### Motivation
Added an agent to scrape for metrics from the functions container and expose it on a port for prometheus to scrape.
---
distribution/server/src/assemble/bin.xml | 5 +
pulsar-functions/metrics/pom.xml | 38 ++++++
.../functions/metrics/PrometheusMetricsServer.java | 137 +++++++++++++++++++++
.../functions/metrics/sink/AbstractWebSink.java | 6 +-
.../functions/runtime/KubernetesRuntime.java | 96 +++++++++++++--
.../runtime/KubernetesRuntimeFactory.java | 11 +-
.../functions/runtime/KubernetesRuntimeTest.java | 2 +-
.../functions/worker/FunctionRuntimeManager.java | 3 +-
.../pulsar/functions/worker/WorkerConfig.java | 1 +
9 files changed, 282 insertions(+), 17 deletions(-)
diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml
index 533c195..c9ad893 100644
--- a/distribution/server/src/assemble/bin.xml
+++ b/distribution/server/src/assemble/bin.xml
@@ -81,6 +81,11 @@
<outputDirectory>instances</outputDirectory>
</file>
<file>
+ <source>${basedir}/../../pulsar-functions/metrics/target/PrometheusMetricsServer.jar</source>
+ <destName>PrometheusMetricsServer.jar</destName>
+ <outputDirectory>instances</outputDirectory>
+ </file>
+ <file>
<source>${basedir}/../../pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar</source>
<destName>api-examples.jar</destName>
<outputDirectory>examples</outputDirectory>
diff --git a/pulsar-functions/metrics/pom.xml b/pulsar-functions/metrics/pom.xml
index afa12cc..ca5e0a5 100644
--- a/pulsar-functions/metrics/pom.xml
+++ b/pulsar-functions/metrics/pom.xml
@@ -45,6 +45,44 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ </dependency>
+
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <finalName>
+ PrometheusMetricsServer
+ </finalName>
+ <archive>
+ <manifest>
+ <mainClass>
+ org.apache.pulsar.functions.sink.PrometheusMetricsServer
+ </mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <appendAssemblyId>false</appendAssemblyId>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java
new file mode 100644
index 0000000..ef9f0ce
--- /dev/null
+++ b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.metrics;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.Empty;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.metrics.sink.AbstractWebSink;
+import org.apache.pulsar.functions.metrics.sink.PrometheusSink;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.*;
+
+/**
+ * A function container implemented using java thread.
+ */
+@Slf4j
+public class PrometheusMetricsServer {
+ @Parameter(names = "--function_details", description = "Function details json\n", required = true)
+ protected String functionDetailsJsonString;
+
+ @Parameter(names = "--prometheus_port", description = "Port to listen for prometheus requests\n", required = true)
+ protected int prometheusPort;
+
+ @Parameter(names = "--grpc_port", description = "GRPC Port to query the metrics from instance\n", required = true)
+ protected int grpc_port;
+
+ @Parameter(names = "--collection_interval", description = "Number in seconds between collection interval\n", required = true)
+ protected int metricsCollectionInterval;
+
+ private FunctionDetails functionDetails;
+ private MetricsSink metricsSink;
+ private ManagedChannel channel;
+ private InstanceControlGrpc.InstanceControlFutureStub stub;
+ private ScheduledExecutorService timer;
+
+ public PrometheusMetricsServer() { }
+
+
+ public void start() throws Exception {
+ FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+ if (functionDetailsJsonString.charAt(0) == '\'') {
+ functionDetailsJsonString = functionDetailsJsonString.substring(1);
+ }
+ if (functionDetailsJsonString.charAt(functionDetailsJsonString.length() - 1) == '\'') {
+ functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1);
+ }
+ JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder);
+ functionDetails = functionDetailsBuilder.build();
+
+ metricsSink = new PrometheusSink();
+ Map<String, String> config = new HashMap<>();
+ config.put(AbstractWebSink.KEY_PATH, "/metrics");
+ config.put(AbstractWebSink.KEY_PORT, String.valueOf(prometheusPort));
+ metricsSink.init(config);
+
+ channel = ManagedChannelBuilder.forAddress("127.0.0.1", grpc_port)
+ .usePlaintext(true)
+ .build();
+ stub = InstanceControlGrpc.newFutureStub(channel);
+
+ if (metricsCollectionInterval > 0) {
+ timer = Executors.newSingleThreadScheduledExecutor();
+ timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ CompletableFuture<InstanceCommunication.MetricsData> result = getMetrics();
+ try {
+ metricsSink.processRecord(result.get(), functionDetails);
+ } catch (Exception e) {
+ log.error("Getting metrics data failed {}/{}/{}",
+ functionDetails.getTenant(),
+ functionDetails.getNamespace(),
+ functionDetails.getName(),
+ e);
+ }
+ }
+ }, metricsCollectionInterval, metricsCollectionInterval, TimeUnit.SECONDS);
+ }
+ }
+
+ public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+ CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
+ ListenableFuture<InstanceCommunication.MetricsData> response = stub.withDeadlineAfter(10, TimeUnit.SECONDS).getAndResetMetrics(Empty.newBuilder().build());
+ Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ retval.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onSuccess(InstanceCommunication.MetricsData t) {
+ retval.complete(t);
+ }
+ });
+ return retval;
+ }
+
+ public static void main(String[] args) throws Exception {
+ PrometheusMetricsServer server = new PrometheusMetricsServer();
+ JCommander jcommander = new JCommander(server);
+ jcommander.setProgramName("PrometheusMetricsServer");
+
+ // parse args by JCommander
+ jcommander.parse(args);
+ server.start();
+ }
+}
diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java
index 86f17d7..cb5541a 100644
--- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java
+++ b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java
@@ -39,16 +39,16 @@ import org.apache.pulsar.functions.metrics.MetricsSink;
/**
* A metrics sink that publishes metrics on a http endpoint
*/
-abstract class AbstractWebSink implements MetricsSink {
+abstract public class AbstractWebSink implements MetricsSink {
private static final Logger LOG = Logger.getLogger(AbstractWebSink.class.getName());
private static final int HTTP_STATUS_OK = 200;
// Metrics will be published on http://host:port/path, the port
- private static final String KEY_PORT = "port";
+ public static final String KEY_PORT = "port";
// The path
- private static final String KEY_PATH = "path";
+ public static final String KEY_PATH = "path";
// Maximum number of metrics getting served
private static final String KEY_METRICS_CACHE_MAX_SIZE = "metrics-cache-max-size";
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index a9e8c75..40ef003 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Empty;
+import com.google.protobuf.util.JsonFormat;
import com.squareup.okhttp.Response;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -34,6 +35,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.metrics.PrometheusMetricsServer;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -62,6 +64,9 @@ class KubernetesRuntime implements Runtime {
private static final String ENV_SHARD_ID = "SHARD_ID";
private static final int maxJobNameSize = 55;
private static final Integer GRPC_PORT = 9093;
+ private static final Integer PROMETHEUS_PORT = 9094;
+ private static final Double prometheusMetricsServerCpu = 0.1;
+ private static final Long prometheusMetricsServerRam = 250000000l;
public static final Pattern VALID_POD_NAME_REGEX =
Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
Pattern.CASE_INSENSITIVE);
@@ -80,6 +85,7 @@ class KubernetesRuntime implements Runtime {
// The thread that invokes the function
@Getter
private List<String> processArgs;
+ private List<String> prometheusMetricsServerArgs;
@Getter
private ManagedChannel[] channel;
private InstanceControlGrpc.InstanceControlFutureStub[] stub;
@@ -103,13 +109,15 @@ class KubernetesRuntime implements Runtime {
String pulsarRootDir,
InstanceConfig instanceConfig,
String instanceFile,
+ String prometheusMetricsServerJarFile,
String logDirectory,
String userCodePkgUrl,
String originalCodeFileName,
String pulsarServiceUrl,
String pulsarAdminUrl,
String stateStorageServiceUrl,
- AuthenticationConfig authConfig) throws Exception {
+ AuthenticationConfig authConfig,
+ Integer expectedMetricsInterval) throws Exception {
this.appsClient = appsClient;
this.coreClient = coreClient;
this.instanceConfig = instanceConfig;
@@ -122,6 +130,7 @@ class KubernetesRuntime implements Runtime {
this.pulsarAdminUrl = pulsarAdminUrl;
this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, pulsarRootDir + "/conf/log4j2.yaml", installUserCodeDependencies);
+ this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval);
running = false;
doChecks(instanceConfig.getFunctionDetails());
}
@@ -379,6 +388,14 @@ class KubernetesRuntime implements Runtime {
);
}
+ protected List<String> getPrometheusMetricsServerCommand() {
+ return Arrays.asList(
+ "sh",
+ "-c",
+ String.join(" ", prometheusMetricsServerArgs)
+ );
+ }
+
private List<String> getDownloadCommand(String bkPath, String userCodeFilePath) {
return Arrays.asList(
pulsarRootDir + "/bin/pulsar-admin",
@@ -428,10 +445,7 @@ class KubernetesRuntime implements Runtime {
// set up pod meta
final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(instanceConfig.getFunctionDetails()));
- /*
- TODO:- Figure out the metrics collection later.
templateMetaData.annotations(getPrometheusAnnotations());
- */
podTemplateSpec.setMetadata(templateMetaData);
final List<String> command = getExecutorCommand();
@@ -447,7 +461,7 @@ class KubernetesRuntime implements Runtime {
private Map<String, String> getPrometheusAnnotations() {
final Map<String, String> annotations = new HashMap<>();
annotations.put("prometheus.io/scrape", "true");
- annotations.put("prometheus.io/port", "8080");
+ annotations.put("prometheus.io/port", String.valueOf(PROMETHEUS_PORT));
return annotations;
}
@@ -472,8 +486,10 @@ class KubernetesRuntime implements Runtime {
// https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions
podSpec.setTolerations(getTolerations());
- podSpec.containers(Collections.singletonList(
- getContainer(instanceCommand, resource)));
+ List<V1Container> containers = new LinkedList<>();
+ containers.add(getFunctionContainer(instanceCommand, resource));
+ containers.add(getPrometheusContainer());
+ podSpec.containers(containers);
return podSpec;
}
@@ -493,7 +509,7 @@ class KubernetesRuntime implements Runtime {
return tolerations;
}
- private V1Container getContainer(List<String> instanceCommand, Function.Resources resource) {
+ private V1Container getFunctionContainer(List<String> instanceCommand, Function.Resources resource) {
final V1Container container = new V1Container().name("pulsarfunction");
// set up the container images
@@ -520,12 +536,44 @@ class KubernetesRuntime implements Runtime {
container.setResources(resourceRequirements);
// set container ports
- container.setPorts(getContainerPorts());
+ container.setPorts(getFunctionContainerPorts());
+
+ return container;
+ }
+
+ private V1Container getPrometheusContainer() {
+ final V1Container container = new V1Container().name("prometheusmetricsserver");
+
+ // set up the container images
+ container.setImage(pulsarDockerImageName);
+
+ // set up the container command
+ container.setCommand(getPrometheusMetricsServerCommand());
+
+ // setup the environment variables for the container
+ final V1EnvVar envVarPodName = new V1EnvVar();
+ envVarPodName.name("POD_NAME")
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath("metadata.name")));
+ container.setEnv(Arrays.asList(envVarPodName));
+
+
+ // set container resources
+ final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
+ final Map<String, Quantity> requests = new HashMap<>();
+ requests.put("memory", Quantity.fromString(Long.toString(prometheusMetricsServerRam)));
+ requests.put("cpu", Quantity.fromString(Double.toString(prometheusMetricsServerCpu)));
+ resourceRequirements.setRequests(requests);
+ container.setResources(resourceRequirements);
+
+ // set container ports
+ container.setPorts(getPrometheusContainerPorts());
return container;
}
- private List<V1ContainerPort> getContainerPorts() {
+ private List<V1ContainerPort> getFunctionContainerPorts() {
List<V1ContainerPort> ports = new ArrayList<>();
final V1ContainerPort port = new V1ContainerPort();
port.setName("grpc");
@@ -534,6 +582,15 @@ class KubernetesRuntime implements Runtime {
return ports;
}
+ private List<V1ContainerPort> getPrometheusContainerPorts() {
+ List<V1ContainerPort> ports = new ArrayList<>();
+ final V1ContainerPort port = new V1ContainerPort();
+ port.setName("prometheus");
+ port.setContainerPort(PROMETHEUS_PORT);
+ ports.add(port);
+ return ports;
+ }
+
private static String createJobName(Function.FunctionDetails functionDetails) {
return createJobName(functionDetails.getTenant(),
functionDetails.getNamespace(),
@@ -557,4 +614,23 @@ class KubernetesRuntime implements Runtime {
throw new RuntimeException("Kubernetes job name size should be less than " + maxJobNameSize);
}
}
+
+ private List<String> composePrometheusMetricsServerArgs(String prometheusMetricsServerFile,
+ Integer expectedMetricsInterval) throws Exception {
+ List<String> args = new LinkedList<>();
+ args.add("java");
+ args.add("-cp");
+ args.add(prometheusMetricsServerFile);
+ args.add("-Xmx" + String.valueOf(prometheusMetricsServerRam));
+ args.add(PrometheusMetricsServer.class.getName());
+ args.add("--function_details");
+ args.add("'" + JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails()) + "'");
+ args.add("--prometheus_port");
+ args.add(String.valueOf(PROMETHEUS_PORT));
+ args.add("--grpc_port");
+ args.add(String.valueOf(GRPC_PORT));
+ args.add("--collection_interval");
+ args.add(String.valueOf(expectedMetricsInterval));
+ return args;
+ }
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index dee265f..b257cbf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -53,7 +53,9 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private final AuthenticationConfig authConfig;
private final String javaInstanceJarFile;
private final String pythonInstanceFile;
+ private final String prometheusMetricsServerJarFile;
private final String logDirectory = "logs/functions";
+ private final Integer expectedMetricsInterval;
private AppsV1Api appsClient;
private CoreV1Api coreClient;
@@ -68,7 +70,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
String pulsarServiceUri,
String pulsarAdminUri,
String stateStorageServiceUri,
- AuthenticationConfig authConfig) {
+ AuthenticationConfig authConfig,
+ Integer expectedMetricsInterval) {
this.k8Uri = k8Uri;
if (!isEmpty(jobNamespace)) {
this.jobNamespace = jobNamespace;
@@ -94,6 +97,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
this.authConfig = authConfig;
this.javaInstanceJarFile = this.pulsarRootDir + "/instances/java-instance.jar";
this.pythonInstanceFile = this.pulsarRootDir + "/instances/python-instance/python_instance_main.py";
+ this.prometheusMetricsServerJarFile = this.pulsarRootDir + "/instances/PrometheusMetricsServer.jar";
+ this.expectedMetricsInterval = expectedMetricsInterval == null ? -1 : expectedMetricsInterval;
}
@Override
@@ -127,13 +132,15 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
pulsarRootDir,
instanceConfig,
instanceFile,
+ prometheusMetricsServerJarFile,
logDirectory,
codePkgUrl,
originalCodeFileName,
pulsarServiceUri,
pulsarAdminUri,
stateStorageServiceUri,
- authConfig);
+ authConfig,
+ expectedMetricsInterval);
}
@Override
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 11e240a..895a1e8 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -72,7 +72,7 @@ public class KubernetesRuntimeTest {
this.stateStorageServiceUrl = "bk://localhost:4181";
this.logDirectory = "logs/functions";
this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir,
- false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null));
+ false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null));
doNothing().when(this.factory).setupClient();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 47d317f..2514ff6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -130,7 +130,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
workerConfig.getStateStorageServiceUrl(),
- authConfig);
+ authConfig,
+ workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval());
} else {
throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index c18f824..bc97969 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -141,6 +141,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private String pulsarAdminUrl;
private Boolean installUserCodeDependencies;
private Map<String, String> customLabels;
+ private Integer expectedMetricsCollectionInterval;
}
private KubernetesContainerFactory kubernetesContainerFactory;