You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/09/16 06:56:44 UTC

[camel] 01/04: CAMEL-13978 - Create ConfigMap Watch feature in Kubernetes Component

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 57fc601efbbd27d70231582dddb277a9bd6eeed6
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Sep 16 08:49:26 2019 +0200

    CAMEL-13978 - Create ConfigMap Watch feature in Kubernetes Component
---
 .../config_maps/KubernetesConfigMapsConsumer.java  | 132 +++++++++++++++++++++
 .../config_maps/KubernetesConfigMapsEndpoint.java  |   2 +-
 .../kubernetes/consumer/common/ConfigMapEvent.java |  47 ++++++++
 3 files changed, 180 insertions(+), 1 deletion(-)

diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
new file mode 100644
index 0000000..bbc489e
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.config_maps;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.consumer.common.ConfigMapEvent;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapList;
+import io.fabric8.kubernetes.api.model.DoneableConfigMap;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.Resource;
+
+public class KubernetesConfigMapsConsumer extends DefaultConsumer {
+
+    private final Processor processor;
+    private ExecutorService executor;
+    private ConfigMapsConsumerTask configMapWatcher;
+
+    public KubernetesConfigMapsConsumer(AbstractKubernetesEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.processor = processor;
+    }
+
+    @Override
+    public AbstractKubernetesEndpoint getEndpoint() {
+        return (AbstractKubernetesEndpoint)super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        executor = getEndpoint().createExecutor();
+
+        configMapWatcher = new ConfigMapsConsumerTask();
+        executor.submit(configMapWatcher);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        log.debug("Stopping Kubernetes ConfigMap Consumer");
+        if (executor != null) {
+            if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
+                if (configMapWatcher != null) {
+                    configMapWatcher.getWatch().close();
+                }
+                getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            } else {
+                if (configMapWatcher != null) {
+                    configMapWatcher.getWatch().close();
+                }
+                executor.shutdownNow();
+            }
+        }
+        executor = null;
+    }
+
+    class ConfigMapsConsumerTask implements Runnable {
+
+        private Watch watch;
+
+        @Override
+        public void run() {
+            NonNamespaceOperation<ConfigMap, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>> w = getEndpoint().getKubernetesClient().configMaps();
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelKey())
+                && ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelValue())) {
+                w.withLabel(getEndpoint().getKubernetesConfiguration().getLabelKey(), getEndpoint().getKubernetesConfiguration().getLabelValue());
+            }
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getResourceName())) {
+                w.withName(getEndpoint().getKubernetesConfiguration().getResourceName());
+            }
+            watch = w.watch(new Watcher<ConfigMap>() {
+
+                @Override
+                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, ConfigMap resource) {
+                    ConfigMapEvent de = new ConfigMapEvent(action, resource);
+                    Exchange exchange = getEndpoint().createExchange();
+                    exchange.getIn().setBody(de.getConfigMap());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, de.getAction());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
+                    try {
+                        processor.process(exchange);
+                    } catch (Exception e) {
+                        getExceptionHandler().handleException("Error during processing", exchange, e);
+                    }
+                }
+
+                @Override
+                public void onClose(KubernetesClientException cause) {
+                    if (cause != null) {
+                        log.error(cause.getMessage(), cause);
+                    }
+
+                }
+            });
+        }
+
+        public Watch getWatch() {
+            return watch;
+        }
+
+        public void setWatch(Watch watch) {
+            this.watch = watch;
+        }
+    }
+}
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsEndpoint.java
index 616dbfc..26dc3c9 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsEndpoint.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsEndpoint.java
@@ -43,7 +43,7 @@ public class KubernetesConfigMapsEndpoint extends AbstractKubernetesEndpoint {
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        throw new IllegalArgumentException("The kubernetes-configmaps doesn't support consumer");
+        return new KubernetesConfigMapsConsumer(this, processor);
     }
 
 }
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ConfigMapEvent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ConfigMapEvent.java
new file mode 100644
index 0000000..d225219
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ConfigMapEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.consumer.common;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.client.Watcher.Action;
+
+public class ConfigMapEvent {
+    private io.fabric8.kubernetes.client.Watcher.Action action;
+
+    private ConfigMap configMap;
+
+    public ConfigMapEvent(Action action, ConfigMap configMap) {
+        this.action = action;
+        this.configMap = configMap;
+    }
+
+    public io.fabric8.kubernetes.client.Watcher.Action getAction() {
+        return action;
+    }
+
+    public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) {
+        this.action = action;
+    }
+
+    public ConfigMap getConfigMap() {
+        return configMap;
+    }
+
+    public void setConfigMap(ConfigMap configMap) {
+        this.configMap = configMap;
+    }
+}