You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/27 05:07:58 UTC

[pulsar] branch master updated: Added ability for the kubernetes to poll a configmap to look out for changes (#2856)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d208c5  Added ability for the kubernetes to poll a configmap to look out for changes (#2856)
2d208c5 is described below

commit 2d208c565e3bd25a1cc12d904f72b91f96ec6cab
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Oct 26 22:07:54 2018 -0700

    Added ability for the kubernetes to poll a configmap to look out for changes (#2856)
    
    ### Motivation
    While running functions worker in kubernetes, it might be hard to update any kind of config of the worker without killing it. This pr allows certain properties of the functions to be stored in a configmap. The KubernetesRuntime Factory will keep an eye for any changes and apply the changes on the fly dramatically simplifying any config updates
---
 .../runtime/KubernetesRuntimeFactory.java          | 126 +++++++++++++++------
 .../functions/runtime/KubernetesRuntimeTest.java   |   4 +-
 .../functions/worker/FunctionRuntimeManager.java   |   4 +-
 .../pulsar/functions/worker/WorkerConfig.java      |   5 +
 4 files changed, 101 insertions(+), 38 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 1a180ae..c6d5d02 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -24,13 +24,20 @@ import io.kubernetes.client.ApiClient;
 import io.kubernetes.client.Configuration;
 import io.kubernetes.client.apis.AppsV1Api;
 import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1ConfigMap;
 import io.kubernetes.client.util.Config;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 
+import java.lang.reflect.Field;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 
@@ -40,24 +47,33 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
 @Slf4j
 public class KubernetesRuntimeFactory implements RuntimeFactory {
 
-    private final String k8Uri;
-    private final String jobNamespace;
-    private final String pulsarDockerImageName;
-    private final String pulsarRootDir;
+    @Getter
+    @Setter
+    @NoArgsConstructor
+    class KubernetesInfo {
+        private String k8Uri;
+        private String jobNamespace;
+        private String pulsarDockerImageName;
+        private String pulsarRootDir;
+        private String pulsarAdminUrl;
+        private String pulsarServiceUrl;
+        private String pythonDependencyRepository;
+        private String pythonExtraDependencyRepository;
+        private String changeConfigMap;
+        private String changeConfigMapNamespace;
+    }
+    private final KubernetesInfo kubernetesInfo;
     private final Boolean submittingInsidePod;
     private final Boolean installUserCodeDependencies;
-    private final String pythonDependencyRepository;
-    private final String pythonExtraDependencyRepository;
     private final Map<String, String> customLabels;
-    private final String pulsarAdminUri;
-    private final String pulsarServiceUri;
+    private final Integer expectedMetricsCollectionInterval;
     private final String stateStorageServiceUri;
     private final AuthenticationConfig authConfig;
     private final String javaInstanceJarFile;
     private final String pythonInstanceFile;
     private final String prometheusMetricsServerJarFile;
     private final String logDirectory = "logs/functions";
-    private final Integer expectedMetricsInterval;
+    private Timer changeConfigMapTimer;
     private AppsV1Api appsClient;
     private CoreV1Api coreClient;
 
@@ -75,36 +91,41 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
                                     String pulsarAdminUri,
                                     String stateStorageServiceUri,
                                     AuthenticationConfig authConfig,
-                                    Integer expectedMetricsInterval) {
-        this.k8Uri = k8Uri;
+                                    Integer expectedMetricsCollectionInterval,
+                                    String changeConfigMap,
+                                    String changeConfigMapNamespace) {
+        this.kubernetesInfo = new KubernetesInfo();
+        this.kubernetesInfo.setK8Uri(k8Uri);
         if (!isEmpty(jobNamespace)) {
-            this.jobNamespace = jobNamespace;
+            this.kubernetesInfo.setJobNamespace(jobNamespace);
         } else {
-            this.jobNamespace = "default";
+            this.kubernetesInfo.setJobNamespace("default");
         }
         if (!isEmpty(pulsarDockerImageName)) {
-            this.pulsarDockerImageName = pulsarDockerImageName;
+            this.kubernetesInfo.setPulsarDockerImageName(pulsarDockerImageName);
         } else {
-            this.pulsarDockerImageName = "apachepulsar/pulsar";
+            this.kubernetesInfo.setPulsarDockerImageName("apachepulsar/pulsar");
         }
         if (!isEmpty(pulsarRootDir)) {
-            this.pulsarRootDir = pulsarRootDir;
+            this.kubernetesInfo.setPulsarRootDir(pulsarRootDir);
         } else {
-            this.pulsarRootDir = "/pulsar";
+            this.kubernetesInfo.setPulsarRootDir("/pulsar");
         }
+        this.kubernetesInfo.setPythonDependencyRepository(pythonDependencyRepository);
+        this.kubernetesInfo.setPythonExtraDependencyRepository(pythonExtraDependencyRepository);
+        this.kubernetesInfo.setPulsarServiceUrl(pulsarServiceUri);
+        this.kubernetesInfo.setPulsarAdminUrl(pulsarAdminUri);
+        this.kubernetesInfo.setChangeConfigMap(changeConfigMap);
+        this.kubernetesInfo.setChangeConfigMapNamespace(changeConfigMapNamespace);
         this.submittingInsidePod = submittingInsidePod;
         this.installUserCodeDependencies = installUserCodeDependencies;
-        this.pythonDependencyRepository = pythonDependencyRepository;
-        this.pythonExtraDependencyRepository = pythonExtraDependencyRepository;
         this.customLabels = customLabels;
-        this.pulsarServiceUri = pulsarServiceUri;
-        this.pulsarAdminUri = pulsarAdminUri;
         this.stateStorageServiceUri = stateStorageServiceUri;
         this.authConfig = authConfig;
-        this.javaInstanceJarFile = this.pulsarRootDir + "/instances/java-instance.jar";
-        this.pythonInstanceFile = this.pulsarRootDir + "/instances/python-instance/python_instance_main.py";
-        this.prometheusMetricsServerJarFile = this.pulsarRootDir + "/instances/PrometheusMetricsServer.jar";
-        this.expectedMetricsInterval = expectedMetricsInterval == null ? -1 : expectedMetricsInterval;
+        this.javaInstanceJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/java-instance.jar";
+        this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py";
+        this.prometheusMetricsServerJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/PrometheusMetricsServer.jar";
+        this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
     }
 
     @Override
@@ -131,24 +152,24 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
         return new KubernetesRuntime(
             appsClient,
             coreClient,
-            jobNamespace,
+            this.kubernetesInfo.getJobNamespace(),
             customLabels,
             installUserCodeDependencies,
-            pythonDependencyRepository,
-            pythonExtraDependencyRepository,
-            pulsarDockerImageName,
-            pulsarRootDir,
+            this.kubernetesInfo.getPythonDependencyRepository(),
+            this.kubernetesInfo.getPythonExtraDependencyRepository(),
+            this.kubernetesInfo.getPulsarDockerImageName(),
+            this.kubernetesInfo.getPulsarRootDir(),
             instanceConfig,
             instanceFile,
             prometheusMetricsServerJarFile,
             logDirectory,
             codePkgUrl,
             originalCodeFileName,
-            pulsarServiceUri,
-            pulsarAdminUri,
+            this.kubernetesInfo.getPulsarServiceUrl(),
+            this.kubernetesInfo.getPulsarAdminUrl(),
             stateStorageServiceUri,
             authConfig,
-            expectedMetricsInterval);
+                expectedMetricsCollectionInterval);
     }
 
     @Override
@@ -163,7 +184,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
     @VisibleForTesting
     void setupClient() throws Exception {
         if (appsClient == null) {
-            if (k8Uri == null) {
+            if (this.kubernetesInfo.getK8Uri() == null) {
                 log.info("k8Uri is null thus going by defaults");
                 ApiClient cli;
                 if (submittingInsidePod) {
@@ -177,11 +198,44 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
                 appsClient = new AppsV1Api();
                 coreClient = new CoreV1Api();
             } else {
-                log.info("Setting up k8Client using uri " + k8Uri);
-                final ApiClient apiClient = new ApiClient().setBasePath(k8Uri);
+                log.info("Setting up k8Client using uri " + this.kubernetesInfo.getK8Uri());
+                final ApiClient apiClient = new ApiClient().setBasePath(this.kubernetesInfo.getK8Uri());
                 appsClient = new AppsV1Api(apiClient);
                 coreClient = new CoreV1Api(apiClient);
             }
+
+            // Setup a timer to change stuff.
+            if (!isEmpty(this.kubernetesInfo.getChangeConfigMap())) {
+                changeConfigMapTimer = new Timer();
+                changeConfigMapTimer.scheduleAtFixedRate(new TimerTask() {
+                    @Override
+                    public void run() {
+                        fetchConfigMap();
+                    }
+                }, 300000, 300000);
+            }
+        }
+    }
+
+    void fetchConfigMap() {
+        try {
+            V1ConfigMap v1ConfigMap = coreClient.readNamespacedConfigMap(kubernetesInfo.getChangeConfigMap(), kubernetesInfo.getChangeConfigMapNamespace(), null, true, false);
+            Map<String, String> data = v1ConfigMap.getData();
+            if (data != null) {
+                overRideKubernetesConfig(data);
+            }
+        } catch (Exception e) {
+            log.error("Error while trying to fetch configmap {} at namespace {}", kubernetesInfo.getChangeConfigMap(), kubernetesInfo.getChangeConfigMapNamespace(), e);
+        }
+    }
+
+    void overRideKubernetesConfig(Map<String, String> data) throws Exception {
+        for (Field field : KubernetesInfo.class.getDeclaredFields()) {
+            field.setAccessible(true);
+            if (data.containsKey(field.getName()) && !data.get(field.getName()).equals(field.get(kubernetesInfo))) {
+                log.info("Kubernetes Config {} changed from {} to {}", field.getName(), field.get(kubernetesInfo), data.get(field.getName()));
+                field.set(kubernetesInfo, data.get(field.getName()));
+            }
         }
     }
 }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index e82c75f..0288660 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -72,7 +72,9 @@ public class KubernetesRuntimeTest {
         this.stateStorageServiceUrl = "bk://localhost:4181";
         this.logDirectory = "logs/functions";
         this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir,
-            false, true, "myrepo", "anotherrepo",  null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null));
+            false, true, "myrepo", "anotherrepo",
+                null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null,
+                null, null, null));
         doNothing().when(this.factory).setupClient();
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index f7dd4bc..d8bf252 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -140,7 +140,9 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
                     workerConfig.getStateStorageServiceUrl(),
                     authConfig,
-                    workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval());
+                    workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval(),
+                    workerConfig.getKubernetesContainerFactory().getChangeConfigMap(),
+                    workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace());
         } else {
             throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 2b3e816..587f311 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -144,6 +144,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
         private String pythonExtraDependencyRepository;
         private Map<String, String> customLabels;
         private Integer expectedMetricsCollectionInterval;
+        // Kubernetes Runtime will periodically checkback on
+        // this configMap if defined and if there are any changes
+        // to the kubernetes specific stuff, we apply those changes
+        private String changeConfigMap;
+        private String changeConfigMapNamespace;
     }
     private KubernetesContainerFactory kubernetesContainerFactory;