You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/12 01:36:30 UTC

[pulsar] branch master updated: Metrics Collector Agent for Functions running in Kubernetes (#2773)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 89ccde0  Metrics Collector Agent for Functions running in Kubernetes (#2773)
89ccde0 is described below

commit 89ccde01617366ae1c5a6950c883b7aacac195e9
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Oct 11 18:36:25 2018 -0700

    Metrics Collector Agent for Functions running in Kubernetes (#2773)
    
    ### Motivation
    
    Added an agent to scrape for metrics from the functions container and expose it on a port for prometheus to scrape.
---
 distribution/server/src/assemble/bin.xml           |   5 +
 pulsar-functions/metrics/pom.xml                   |  38 ++++++
 .../functions/metrics/PrometheusMetricsServer.java | 137 +++++++++++++++++++++
 .../functions/metrics/sink/AbstractWebSink.java    |   6 +-
 .../functions/runtime/KubernetesRuntime.java       |  96 +++++++++++++--
 .../runtime/KubernetesRuntimeFactory.java          |  11 +-
 .../functions/runtime/KubernetesRuntimeTest.java   |   2 +-
 .../functions/worker/FunctionRuntimeManager.java   |   3 +-
 .../pulsar/functions/worker/WorkerConfig.java      |   1 +
 9 files changed, 282 insertions(+), 17 deletions(-)

diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml
index 533c195..c9ad893 100644
--- a/distribution/server/src/assemble/bin.xml
+++ b/distribution/server/src/assemble/bin.xml
@@ -81,6 +81,11 @@
       <outputDirectory>instances</outputDirectory>
     </file>
     <file>
+      <source>${basedir}/../../pulsar-functions/metrics/target/PrometheusMetricsServer.jar</source>
+      <destName>PrometheusMetricsServer.jar</destName>
+      <outputDirectory>instances</outputDirectory>
+    </file>
+    <file>
       <source>${basedir}/../../pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar</source>
       <destName>api-examples.jar</destName>
       <outputDirectory>examples</outputDirectory>
diff --git a/pulsar-functions/metrics/pom.xml b/pulsar-functions/metrics/pom.xml
index afa12cc..ca5e0a5 100644
--- a/pulsar-functions/metrics/pom.xml
+++ b/pulsar-functions/metrics/pom.xml
@@ -45,6 +45,44 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>com.beust</groupId>
+      <artifactId>jcommander</artifactId>
+    </dependency>
+
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <finalName>
+                PrometheusMetricsServer
+              </finalName>
+              <archive>
+                <manifest>
+                  <mainClass>
+                    org.apache.pulsar.functions.sink.PrometheusMetricsServer
+                  </mainClass>
+                </manifest>
+              </archive>
+              <descriptorRefs>
+                <descriptorRef>jar-with-dependencies</descriptorRef>
+              </descriptorRefs>
+              <appendAssemblyId>false</appendAssemblyId>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>
diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java
new file mode 100644
index 0000000..ef9f0ce
--- /dev/null
+++ b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.metrics;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.Empty;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.metrics.sink.AbstractWebSink;
+import org.apache.pulsar.functions.metrics.sink.PrometheusSink;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.*;
+
+/**
+ * A function container implemented using java thread.
+ */
+@Slf4j
+public class PrometheusMetricsServer {
+    @Parameter(names = "--function_details", description = "Function details json\n", required = true)
+    protected String functionDetailsJsonString;
+
+    @Parameter(names = "--prometheus_port", description = "Port to listen for prometheus requests\n", required = true)
+    protected int prometheusPort;
+
+    @Parameter(names = "--grpc_port", description = "GRPC Port to query the metrics from instance\n", required = true)
+    protected int grpc_port;
+
+    @Parameter(names = "--collection_interval", description = "Number in seconds between collection interval\n", required = true)
+    protected int metricsCollectionInterval;
+
+    private FunctionDetails functionDetails;
+    private MetricsSink metricsSink;
+    private ManagedChannel channel;
+    private InstanceControlGrpc.InstanceControlFutureStub stub;
+    private ScheduledExecutorService timer;
+
+    public PrometheusMetricsServer() { }
+
+
+    public void start() throws Exception {
+        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+        if (functionDetailsJsonString.charAt(0) == '\'') {
+            functionDetailsJsonString = functionDetailsJsonString.substring(1);
+        }
+        if (functionDetailsJsonString.charAt(functionDetailsJsonString.length() - 1) == '\'') {
+            functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1);
+        }
+        JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder);
+        functionDetails = functionDetailsBuilder.build();
+
+        metricsSink = new PrometheusSink();
+        Map<String, String> config = new HashMap<>();
+        config.put(AbstractWebSink.KEY_PATH, "/metrics");
+        config.put(AbstractWebSink.KEY_PORT, String.valueOf(prometheusPort));
+        metricsSink.init(config);
+
+        channel = ManagedChannelBuilder.forAddress("127.0.0.1", grpc_port)
+                .usePlaintext(true)
+                .build();
+        stub = InstanceControlGrpc.newFutureStub(channel);
+
+        if (metricsCollectionInterval > 0) {
+            timer = Executors.newSingleThreadScheduledExecutor();
+            timer.scheduleAtFixedRate(new TimerTask() {
+                @Override
+                public void run() {
+                    CompletableFuture<InstanceCommunication.MetricsData> result = getMetrics();
+                    try {
+                        metricsSink.processRecord(result.get(), functionDetails);
+                    } catch (Exception e) {
+                        log.error("Getting metrics data failed {}/{}/{}",
+                                functionDetails.getTenant(),
+                                functionDetails.getNamespace(),
+                                functionDetails.getName(),
+                                e);
+                    }
+                }
+            }, metricsCollectionInterval, metricsCollectionInterval, TimeUnit.SECONDS);
+        }
+    }
+
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+        CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
+        ListenableFuture<InstanceCommunication.MetricsData> response = stub.withDeadlineAfter(10, TimeUnit.SECONDS).getAndResetMetrics(Empty.newBuilder().build());
+        Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() {
+            @Override
+            public void onFailure(Throwable throwable) {
+                retval.completeExceptionally(throwable);
+            }
+
+            @Override
+            public void onSuccess(InstanceCommunication.MetricsData t) {
+                retval.complete(t);
+            }
+        });
+        return retval;
+    }
+
+    public static void main(String[] args) throws Exception {
+        PrometheusMetricsServer server = new PrometheusMetricsServer();
+        JCommander jcommander = new JCommander(server);
+        jcommander.setProgramName("PrometheusMetricsServer");
+
+        // parse args by JCommander
+        jcommander.parse(args);
+        server.start();
+    }
+}
diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java
index 86f17d7..cb5541a 100644
--- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java
+++ b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java
@@ -39,16 +39,16 @@ import org.apache.pulsar.functions.metrics.MetricsSink;
 /**
  * A metrics sink that publishes metrics on a http endpoint
  */
-abstract class AbstractWebSink implements MetricsSink {
+abstract public class AbstractWebSink implements MetricsSink {
     private static final Logger LOG = Logger.getLogger(AbstractWebSink.class.getName());
 
     private static final int HTTP_STATUS_OK = 200;
 
     // Metrics will be published on http://host:port/path, the port
-    private static final String KEY_PORT = "port";
+    public static final String KEY_PORT = "port";
 
     // The path
-    private static final String KEY_PATH = "path";
+    public static final String KEY_PATH = "path";
 
     // Maximum number of metrics getting served
     private static final String KEY_METRICS_CACHE_MAX_SIZE = "metrics-cache-max-size";
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index a9e8c75..40ef003 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.Empty;
+import com.google.protobuf.util.JsonFormat;
 import com.squareup.okhttp.Response;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
@@ -34,6 +35,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.metrics.PrometheusMetricsServer;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -62,6 +64,9 @@ class KubernetesRuntime implements Runtime {
     private static final String ENV_SHARD_ID = "SHARD_ID";
     private static final int maxJobNameSize = 55;
     private static final Integer GRPC_PORT = 9093;
+    private static final Integer PROMETHEUS_PORT = 9094;
+    private static final Double prometheusMetricsServerCpu = 0.1;
+    private static final Long prometheusMetricsServerRam = 250000000l;
     public static final Pattern VALID_POD_NAME_REGEX =
             Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
                     Pattern.CASE_INSENSITIVE);
@@ -80,6 +85,7 @@ class KubernetesRuntime implements Runtime {
     // The thread that invokes the function
     @Getter
     private List<String> processArgs;
+    private List<String> prometheusMetricsServerArgs;
     @Getter
     private ManagedChannel[] channel;
     private InstanceControlGrpc.InstanceControlFutureStub[] stub;
@@ -103,13 +109,15 @@ class KubernetesRuntime implements Runtime {
                       String pulsarRootDir,
                       InstanceConfig instanceConfig,
                       String instanceFile,
+                      String prometheusMetricsServerJarFile,
                       String logDirectory,
                       String userCodePkgUrl,
                       String originalCodeFileName,
                       String pulsarServiceUrl,
                       String pulsarAdminUrl,
                       String stateStorageServiceUrl,
-                      AuthenticationConfig authConfig) throws Exception {
+                      AuthenticationConfig authConfig,
+                      Integer expectedMetricsInterval) throws Exception {
         this.appsClient = appsClient;
         this.coreClient = coreClient;
         this.instanceConfig = instanceConfig;
@@ -122,6 +130,7 @@ class KubernetesRuntime implements Runtime {
         this.pulsarAdminUrl = pulsarAdminUrl;
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
                 authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, pulsarRootDir + "/conf/log4j2.yaml", installUserCodeDependencies);
+        this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval);
         running = false;
         doChecks(instanceConfig.getFunctionDetails());
     }
@@ -379,6 +388,14 @@ class KubernetesRuntime implements Runtime {
         );
     }
 
+    protected List<String> getPrometheusMetricsServerCommand() {
+        return Arrays.asList(
+                "sh",
+                "-c",
+                String.join(" ", prometheusMetricsServerArgs)
+        );
+    }
+
     private List<String> getDownloadCommand(String bkPath, String userCodeFilePath) {
         return Arrays.asList(
                 pulsarRootDir + "/bin/pulsar-admin",
@@ -428,10 +445,7 @@ class KubernetesRuntime implements Runtime {
 
         // set up pod meta
         final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(instanceConfig.getFunctionDetails()));
-        /*
-        TODO:- Figure out the metrics collection later.
         templateMetaData.annotations(getPrometheusAnnotations());
-        */
         podTemplateSpec.setMetadata(templateMetaData);
 
         final List<String> command = getExecutorCommand();
@@ -447,7 +461,7 @@ class KubernetesRuntime implements Runtime {
     private Map<String, String> getPrometheusAnnotations() {
         final Map<String, String> annotations = new HashMap<>();
         annotations.put("prometheus.io/scrape", "true");
-        annotations.put("prometheus.io/port", "8080");
+        annotations.put("prometheus.io/port", String.valueOf(PROMETHEUS_PORT));
         return annotations;
     }
 
@@ -472,8 +486,10 @@ class KubernetesRuntime implements Runtime {
         // https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions
         podSpec.setTolerations(getTolerations());
 
-        podSpec.containers(Collections.singletonList(
-                getContainer(instanceCommand, resource)));
+        List<V1Container> containers = new LinkedList<>();
+        containers.add(getFunctionContainer(instanceCommand, resource));
+        containers.add(getPrometheusContainer());
+        podSpec.containers(containers);
 
         return podSpec;
     }
@@ -493,7 +509,7 @@ class KubernetesRuntime implements Runtime {
         return tolerations;
     }
 
-    private V1Container getContainer(List<String> instanceCommand, Function.Resources resource) {
+    private V1Container getFunctionContainer(List<String> instanceCommand, Function.Resources resource) {
         final V1Container container = new V1Container().name("pulsarfunction");
 
         // set up the container images
@@ -520,12 +536,44 @@ class KubernetesRuntime implements Runtime {
         container.setResources(resourceRequirements);
 
         // set container ports
-        container.setPorts(getContainerPorts());
+        container.setPorts(getFunctionContainerPorts());
+
+        return container;
+    }
+
+    private V1Container getPrometheusContainer() {
+        final V1Container container = new V1Container().name("prometheusmetricsserver");
+
+        // set up the container images
+        container.setImage(pulsarDockerImageName);
+
+        // set up the container command
+        container.setCommand(getPrometheusMetricsServerCommand());
+
+        // setup the environment variables for the container
+        final V1EnvVar envVarPodName = new V1EnvVar();
+        envVarPodName.name("POD_NAME")
+                .valueFrom(new V1EnvVarSource()
+                        .fieldRef(new V1ObjectFieldSelector()
+                                .fieldPath("metadata.name")));
+        container.setEnv(Arrays.asList(envVarPodName));
+
+
+        // set container resources
+        final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
+        final Map<String, Quantity> requests = new HashMap<>();
+        requests.put("memory", Quantity.fromString(Long.toString(prometheusMetricsServerRam)));
+        requests.put("cpu", Quantity.fromString(Double.toString(prometheusMetricsServerCpu)));
+        resourceRequirements.setRequests(requests);
+        container.setResources(resourceRequirements);
+
+        // set container ports
+        container.setPorts(getPrometheusContainerPorts());
 
         return container;
     }
 
-    private List<V1ContainerPort> getContainerPorts() {
+    private List<V1ContainerPort> getFunctionContainerPorts() {
         List<V1ContainerPort> ports = new ArrayList<>();
         final V1ContainerPort port = new V1ContainerPort();
         port.setName("grpc");
@@ -534,6 +582,15 @@ class KubernetesRuntime implements Runtime {
         return ports;
     }
 
+    private List<V1ContainerPort> getPrometheusContainerPorts() {
+        List<V1ContainerPort> ports = new ArrayList<>();
+        final V1ContainerPort port = new V1ContainerPort();
+        port.setName("prometheus");
+        port.setContainerPort(PROMETHEUS_PORT);
+        ports.add(port);
+        return ports;
+    }
+
     private static String createJobName(Function.FunctionDetails functionDetails) {
         return createJobName(functionDetails.getTenant(),
                 functionDetails.getNamespace(),
@@ -557,4 +614,23 @@ class KubernetesRuntime implements Runtime {
             throw new RuntimeException("Kubernetes job name size should be less than " + maxJobNameSize);
         }
     }
+
+    private List<String> composePrometheusMetricsServerArgs(String prometheusMetricsServerFile,
+                                                            Integer expectedMetricsInterval) throws Exception {
+        List<String> args = new LinkedList<>();
+        args.add("java");
+        args.add("-cp");
+        args.add(prometheusMetricsServerFile);
+        args.add("-Xmx" + String.valueOf(prometheusMetricsServerRam));
+        args.add(PrometheusMetricsServer.class.getName());
+        args.add("--function_details");
+        args.add("'" + JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails()) + "'");
+        args.add("--prometheus_port");
+        args.add(String.valueOf(PROMETHEUS_PORT));
+        args.add("--grpc_port");
+        args.add(String.valueOf(GRPC_PORT));
+        args.add("--collection_interval");
+        args.add(String.valueOf(expectedMetricsInterval));
+        return args;
+    }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index dee265f..b257cbf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -53,7 +53,9 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
     private final AuthenticationConfig authConfig;
     private final String javaInstanceJarFile;
     private final String pythonInstanceFile;
+    private final String prometheusMetricsServerJarFile;
     private final String logDirectory = "logs/functions";
+    private final Integer expectedMetricsInterval;
     private AppsV1Api appsClient;
     private CoreV1Api coreClient;
 
@@ -68,7 +70,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
                                     String pulsarServiceUri,
                                     String pulsarAdminUri,
                                     String stateStorageServiceUri,
-                                    AuthenticationConfig authConfig) {
+                                    AuthenticationConfig authConfig,
+                                    Integer expectedMetricsInterval) {
         this.k8Uri = k8Uri;
         if (!isEmpty(jobNamespace)) {
             this.jobNamespace = jobNamespace;
@@ -94,6 +97,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
         this.authConfig = authConfig;
         this.javaInstanceJarFile = this.pulsarRootDir + "/instances/java-instance.jar";
         this.pythonInstanceFile = this.pulsarRootDir + "/instances/python-instance/python_instance_main.py";
+        this.prometheusMetricsServerJarFile = this.pulsarRootDir + "/instances/PrometheusMetricsServer.jar";
+        this.expectedMetricsInterval = expectedMetricsInterval == null ? -1 : expectedMetricsInterval;
     }
 
     @Override
@@ -127,13 +132,15 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
             pulsarRootDir,
             instanceConfig,
             instanceFile,
+            prometheusMetricsServerJarFile,
             logDirectory,
             codePkgUrl,
             originalCodeFileName,
             pulsarServiceUri,
             pulsarAdminUri,
             stateStorageServiceUri,
-            authConfig);
+            authConfig,
+            expectedMetricsInterval);
     }
 
     @Override
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 11e240a..895a1e8 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -72,7 +72,7 @@ public class KubernetesRuntimeTest {
         this.stateStorageServiceUrl = "bk://localhost:4181";
         this.logDirectory = "logs/functions";
         this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir,
-            false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null));
+            false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null));
         doNothing().when(this.factory).setupClient();
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 47d317f..2514ff6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -130,7 +130,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
                     StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
                     workerConfig.getStateStorageServiceUrl(),
-                    authConfig);
+                    authConfig,
+                    workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval());
         } else {
             throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index c18f824..bc97969 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -141,6 +141,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
         private String pulsarAdminUrl;
         private Boolean installUserCodeDependencies;
         private Map<String, String> customLabels;
+        private Integer expectedMetricsCollectionInterval;
     }
     private KubernetesContainerFactory kubernetesContainerFactory;