You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/12 01:36:27 UTC

[GitHub] sijie closed pull request #2773: Metrics Collector Agent for Functions running in Kubernetes

sijie closed pull request #2773: Metrics Collector Agent for Functions running in Kubernetes
URL: https://github.com/apache/pulsar/pull/2773
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml
index 533c1951a8..c9ad893d56 100644
--- a/distribution/server/src/assemble/bin.xml
+++ b/distribution/server/src/assemble/bin.xml
@@ -80,6 +80,11 @@
       <destName>java-instance.jar</destName>
       <outputDirectory>instances</outputDirectory>
     </file>
+    <file>
+      <source>${basedir}/../../pulsar-functions/metrics/target/PrometheusMetricsServer.jar</source>
+      <destName>PrometheusMetricsServer.jar</destName>
+      <outputDirectory>instances</outputDirectory>
+    </file>
     <file>
       <source>${basedir}/../../pulsar-functions/java-examples/target/pulsar-functions-api-examples.jar</source>
       <destName>api-examples.jar</destName>
diff --git a/pulsar-functions/metrics/pom.xml b/pulsar-functions/metrics/pom.xml
index afa12cc525..ca5e0a5d55 100644
--- a/pulsar-functions/metrics/pom.xml
+++ b/pulsar-functions/metrics/pom.xml
@@ -45,6 +45,44 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>com.beust</groupId>
+      <artifactId>jcommander</artifactId>
+    </dependency>
+
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <finalName>
+                PrometheusMetricsServer
+              </finalName>
+              <archive>
+                <manifest>
+                  <mainClass>
+                    org.apache.pulsar.functions.sink.PrometheusMetricsServer
+                  </mainClass>
+                </manifest>
+              </archive>
+              <descriptorRefs>
+                <descriptorRef>jar-with-dependencies</descriptorRef>
+              </descriptorRefs>
+              <appendAssemblyId>false</appendAssemblyId>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>
diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java
new file mode 100644
index 0000000000..ef9f0cec63
--- /dev/null
+++ b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/PrometheusMetricsServer.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.metrics;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.Empty;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.metrics.sink.AbstractWebSink;
+import org.apache.pulsar.functions.metrics.sink.PrometheusSink;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.*;
+
+/**
+ * A function container implemented using java thread.
+ */
+@Slf4j
+public class PrometheusMetricsServer {
+    @Parameter(names = "--function_details", description = "Function details json\n", required = true)
+    protected String functionDetailsJsonString;
+
+    @Parameter(names = "--prometheus_port", description = "Port to listen for prometheus requests\n", required = true)
+    protected int prometheusPort;
+
+    @Parameter(names = "--grpc_port", description = "GRPC Port to query the metrics from instance\n", required = true)
+    protected int grpc_port;
+
+    @Parameter(names = "--collection_interval", description = "Number in seconds between collection interval\n", required = true)
+    protected int metricsCollectionInterval;
+
+    private FunctionDetails functionDetails;
+    private MetricsSink metricsSink;
+    private ManagedChannel channel;
+    private InstanceControlGrpc.InstanceControlFutureStub stub;
+    private ScheduledExecutorService timer;
+
+    public PrometheusMetricsServer() { }
+
+
+    public void start() throws Exception {
+        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+        if (functionDetailsJsonString.charAt(0) == '\'') {
+            functionDetailsJsonString = functionDetailsJsonString.substring(1);
+        }
+        if (functionDetailsJsonString.charAt(functionDetailsJsonString.length() - 1) == '\'') {
+            functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1);
+        }
+        JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder);
+        functionDetails = functionDetailsBuilder.build();
+
+        metricsSink = new PrometheusSink();
+        Map<String, String> config = new HashMap<>();
+        config.put(AbstractWebSink.KEY_PATH, "/metrics");
+        config.put(AbstractWebSink.KEY_PORT, String.valueOf(prometheusPort));
+        metricsSink.init(config);
+
+        channel = ManagedChannelBuilder.forAddress("127.0.0.1", grpc_port)
+                .usePlaintext(true)
+                .build();
+        stub = InstanceControlGrpc.newFutureStub(channel);
+
+        if (metricsCollectionInterval > 0) {
+            timer = Executors.newSingleThreadScheduledExecutor();
+            timer.scheduleAtFixedRate(new TimerTask() {
+                @Override
+                public void run() {
+                    CompletableFuture<InstanceCommunication.MetricsData> result = getMetrics();
+                    try {
+                        metricsSink.processRecord(result.get(), functionDetails);
+                    } catch (Exception e) {
+                        log.error("Getting metrics data failed {}/{}/{}",
+                                functionDetails.getTenant(),
+                                functionDetails.getNamespace(),
+                                functionDetails.getName(),
+                                e);
+                    }
+                }
+            }, metricsCollectionInterval, metricsCollectionInterval, TimeUnit.SECONDS);
+        }
+    }
+
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+        CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
+        ListenableFuture<InstanceCommunication.MetricsData> response = stub.withDeadlineAfter(10, TimeUnit.SECONDS).getAndResetMetrics(Empty.newBuilder().build());
+        Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() {
+            @Override
+            public void onFailure(Throwable throwable) {
+                retval.completeExceptionally(throwable);
+            }
+
+            @Override
+            public void onSuccess(InstanceCommunication.MetricsData t) {
+                retval.complete(t);
+            }
+        });
+        return retval;
+    }
+
+    public static void main(String[] args) throws Exception {
+        PrometheusMetricsServer server = new PrometheusMetricsServer();
+        JCommander jcommander = new JCommander(server);
+        jcommander.setProgramName("PrometheusMetricsServer");
+
+        // parse args by JCommander
+        jcommander.parse(args);
+        server.start();
+    }
+}
diff --git a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java
index 86f17d7db4..cb5541a462 100644
--- a/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java
+++ b/pulsar-functions/metrics/src/main/java/org/apache/pulsar/functions/metrics/sink/AbstractWebSink.java
@@ -39,16 +39,16 @@
 /**
  * A metrics sink that publishes metrics on a http endpoint
  */
-abstract class AbstractWebSink implements MetricsSink {
+abstract public class AbstractWebSink implements MetricsSink {
     private static final Logger LOG = Logger.getLogger(AbstractWebSink.class.getName());
 
     private static final int HTTP_STATUS_OK = 200;
 
     // Metrics will be published on http://host:port/path, the port
-    private static final String KEY_PORT = "port";
+    public static final String KEY_PORT = "port";
 
     // The path
-    private static final String KEY_PATH = "path";
+    public static final String KEY_PATH = "path";
 
     // Maximum number of metrics getting served
     private static final String KEY_METRICS_CACHE_MAX_SIZE = "metrics-cache-max-size";
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index a9e8c75b16..40ef003a14 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -23,6 +23,7 @@
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.Empty;
+import com.google.protobuf.util.JsonFormat;
 import com.squareup.okhttp.Response;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
@@ -34,6 +35,7 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.metrics.PrometheusMetricsServer;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -62,6 +64,9 @@
     private static final String ENV_SHARD_ID = "SHARD_ID";
     private static final int maxJobNameSize = 55;
     private static final Integer GRPC_PORT = 9093;
+    private static final Integer PROMETHEUS_PORT = 9094;
+    private static final Double prometheusMetricsServerCpu = 0.1;
+    private static final Long prometheusMetricsServerRam = 250000000l;
     public static final Pattern VALID_POD_NAME_REGEX =
             Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
                     Pattern.CASE_INSENSITIVE);
@@ -80,6 +85,7 @@
     // The thread that invokes the function
     @Getter
     private List<String> processArgs;
+    private List<String> prometheusMetricsServerArgs;
     @Getter
     private ManagedChannel[] channel;
     private InstanceControlGrpc.InstanceControlFutureStub[] stub;
@@ -103,13 +109,15 @@
                       String pulsarRootDir,
                       InstanceConfig instanceConfig,
                       String instanceFile,
+                      String prometheusMetricsServerJarFile,
                       String logDirectory,
                       String userCodePkgUrl,
                       String originalCodeFileName,
                       String pulsarServiceUrl,
                       String pulsarAdminUrl,
                       String stateStorageServiceUrl,
-                      AuthenticationConfig authConfig) throws Exception {
+                      AuthenticationConfig authConfig,
+                      Integer expectedMetricsInterval) throws Exception {
         this.appsClient = appsClient;
         this.coreClient = coreClient;
         this.instanceConfig = instanceConfig;
@@ -122,6 +130,7 @@
         this.pulsarAdminUrl = pulsarAdminUrl;
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
                 authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, pulsarRootDir + "/conf/log4j2.yaml", installUserCodeDependencies);
+        this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval);
         running = false;
         doChecks(instanceConfig.getFunctionDetails());
     }
@@ -379,6 +388,14 @@ public void deleteService() throws Exception {
         );
     }
 
+    protected List<String> getPrometheusMetricsServerCommand() {
+        return Arrays.asList(
+                "sh",
+                "-c",
+                String.join(" ", prometheusMetricsServerArgs)
+        );
+    }
+
     private List<String> getDownloadCommand(String bkPath, String userCodeFilePath) {
         return Arrays.asList(
                 pulsarRootDir + "/bin/pulsar-admin",
@@ -428,10 +445,7 @@ private V1StatefulSet createStatefulSet() {
 
         // set up pod meta
         final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(instanceConfig.getFunctionDetails()));
-        /*
-        TODO:- Figure out the metrics collection later.
         templateMetaData.annotations(getPrometheusAnnotations());
-        */
         podTemplateSpec.setMetadata(templateMetaData);
 
         final List<String> command = getExecutorCommand();
@@ -447,7 +461,7 @@ private V1StatefulSet createStatefulSet() {
     private Map<String, String> getPrometheusAnnotations() {
         final Map<String, String> annotations = new HashMap<>();
         annotations.put("prometheus.io/scrape", "true");
-        annotations.put("prometheus.io/port", "8080");
+        annotations.put("prometheus.io/port", String.valueOf(PROMETHEUS_PORT));
         return annotations;
     }
 
@@ -472,8 +486,10 @@ private V1PodSpec getPodSpec(List<String> instanceCommand, Function.Resources re
         // https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions
         podSpec.setTolerations(getTolerations());
 
-        podSpec.containers(Collections.singletonList(
-                getContainer(instanceCommand, resource)));
+        List<V1Container> containers = new LinkedList<>();
+        containers.add(getFunctionContainer(instanceCommand, resource));
+        containers.add(getPrometheusContainer());
+        podSpec.containers(containers);
 
         return podSpec;
     }
@@ -493,7 +509,7 @@ private V1PodSpec getPodSpec(List<String> instanceCommand, Function.Resources re
         return tolerations;
     }
 
-    private V1Container getContainer(List<String> instanceCommand, Function.Resources resource) {
+    private V1Container getFunctionContainer(List<String> instanceCommand, Function.Resources resource) {
         final V1Container container = new V1Container().name("pulsarfunction");
 
         // set up the container images
@@ -520,12 +536,44 @@ private V1Container getContainer(List<String> instanceCommand, Function.Resource
         container.setResources(resourceRequirements);
 
         // set container ports
-        container.setPorts(getContainerPorts());
+        container.setPorts(getFunctionContainerPorts());
+
+        return container;
+    }
+
+    private V1Container getPrometheusContainer() {
+        final V1Container container = new V1Container().name("prometheusmetricsserver");
+
+        // set up the container images
+        container.setImage(pulsarDockerImageName);
+
+        // set up the container command
+        container.setCommand(getPrometheusMetricsServerCommand());
+
+        // setup the environment variables for the container
+        final V1EnvVar envVarPodName = new V1EnvVar();
+        envVarPodName.name("POD_NAME")
+                .valueFrom(new V1EnvVarSource()
+                        .fieldRef(new V1ObjectFieldSelector()
+                                .fieldPath("metadata.name")));
+        container.setEnv(Arrays.asList(envVarPodName));
+
+
+        // set container resources
+        final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
+        final Map<String, Quantity> requests = new HashMap<>();
+        requests.put("memory", Quantity.fromString(Long.toString(prometheusMetricsServerRam)));
+        requests.put("cpu", Quantity.fromString(Double.toString(prometheusMetricsServerCpu)));
+        resourceRequirements.setRequests(requests);
+        container.setResources(resourceRequirements);
+
+        // set container ports
+        container.setPorts(getPrometheusContainerPorts());
 
         return container;
     }
 
-    private List<V1ContainerPort> getContainerPorts() {
+    private List<V1ContainerPort> getFunctionContainerPorts() {
         List<V1ContainerPort> ports = new ArrayList<>();
         final V1ContainerPort port = new V1ContainerPort();
         port.setName("grpc");
@@ -534,6 +582,15 @@ private V1Container getContainer(List<String> instanceCommand, Function.Resource
         return ports;
     }
 
+    private List<V1ContainerPort> getPrometheusContainerPorts() {
+        List<V1ContainerPort> ports = new ArrayList<>();
+        final V1ContainerPort port = new V1ContainerPort();
+        port.setName("prometheus");
+        port.setContainerPort(PROMETHEUS_PORT);
+        ports.add(port);
+        return ports;
+    }
+
     private static String createJobName(Function.FunctionDetails functionDetails) {
         return createJobName(functionDetails.getTenant(),
                 functionDetails.getNamespace(),
@@ -557,4 +614,23 @@ public static void doChecks(Function.FunctionDetails functionDetails) {
             throw new RuntimeException("Kubernetes job name size should be less than " + maxJobNameSize);
         }
     }
+
+    private List<String> composePrometheusMetricsServerArgs(String prometheusMetricsServerFile,
+                                                            Integer expectedMetricsInterval) throws Exception {
+        List<String> args = new LinkedList<>();
+        args.add("java");
+        args.add("-cp");
+        args.add(prometheusMetricsServerFile);
+        args.add("-Xmx" + String.valueOf(prometheusMetricsServerRam));
+        args.add(PrometheusMetricsServer.class.getName());
+        args.add("--function_details");
+        args.add("'" + JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails()) + "'");
+        args.add("--prometheus_port");
+        args.add(String.valueOf(PROMETHEUS_PORT));
+        args.add("--grpc_port");
+        args.add(String.valueOf(GRPC_PORT));
+        args.add("--collection_interval");
+        args.add(String.valueOf(expectedMetricsInterval));
+        return args;
+    }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index dee265f346..b257cbfe08 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -53,7 +53,9 @@
     private final AuthenticationConfig authConfig;
     private final String javaInstanceJarFile;
     private final String pythonInstanceFile;
+    private final String prometheusMetricsServerJarFile;
     private final String logDirectory = "logs/functions";
+    private final Integer expectedMetricsInterval;
     private AppsV1Api appsClient;
     private CoreV1Api coreClient;
 
@@ -68,7 +70,8 @@ public KubernetesRuntimeFactory(String k8Uri,
                                     String pulsarServiceUri,
                                     String pulsarAdminUri,
                                     String stateStorageServiceUri,
-                                    AuthenticationConfig authConfig) {
+                                    AuthenticationConfig authConfig,
+                                    Integer expectedMetricsInterval) {
         this.k8Uri = k8Uri;
         if (!isEmpty(jobNamespace)) {
             this.jobNamespace = jobNamespace;
@@ -94,6 +97,8 @@ public KubernetesRuntimeFactory(String k8Uri,
         this.authConfig = authConfig;
         this.javaInstanceJarFile = this.pulsarRootDir + "/instances/java-instance.jar";
         this.pythonInstanceFile = this.pulsarRootDir + "/instances/python-instance/python_instance_main.py";
+        this.prometheusMetricsServerJarFile = this.pulsarRootDir + "/instances/PrometheusMetricsServer.jar";
+        this.expectedMetricsInterval = expectedMetricsInterval == null ? -1 : expectedMetricsInterval;
     }
 
     @Override
@@ -127,13 +132,15 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
             pulsarRootDir,
             instanceConfig,
             instanceFile,
+            prometheusMetricsServerJarFile,
             logDirectory,
             codePkgUrl,
             originalCodeFileName,
             pulsarServiceUri,
             pulsarAdminUri,
             stateStorageServiceUri,
-            authConfig);
+            authConfig,
+            expectedMetricsInterval);
     }
 
     @Override
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 11e240a4b1..895a1e85a5 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -72,7 +72,7 @@ public KubernetesRuntimeTest() throws Exception {
         this.stateStorageServiceUrl = "bk://localhost:4181";
         this.logDirectory = "logs/functions";
         this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir,
-            false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null));
+            false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null));
         doNothing().when(this.factory).setupClient();
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 47d317fc94..2514ff6908 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -130,7 +130,8 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
                     StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
                     StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
                     workerConfig.getStateStorageServiceUrl(),
-                    authConfig);
+                    authConfig,
+                    workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval());
         } else {
             throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index c18f824714..bc97969116 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -141,6 +141,7 @@
         private String pulsarAdminUrl;
         private Boolean installUserCodeDependencies;
         private Map<String, String> customLabels;
+        private Integer expectedMetricsCollectionInterval;
     }
     private KubernetesContainerFactory kubernetesContainerFactory;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services