You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/09/16 06:56:44 UTC
[camel] 01/04: CAMEL-13978 - Create ConfigMap Watch feature in
Kubernetes Component
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 57fc601efbbd27d70231582dddb277a9bd6eeed6
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Sep 16 08:49:26 2019 +0200
CAMEL-13978 - Create ConfigMap Watch feature in Kubernetes Component
---
.../config_maps/KubernetesConfigMapsConsumer.java | 132 +++++++++++++++++++++
.../config_maps/KubernetesConfigMapsEndpoint.java | 2 +-
.../kubernetes/consumer/common/ConfigMapEvent.java | 47 ++++++++
3 files changed, 180 insertions(+), 1 deletion(-)
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
new file mode 100644
index 0000000..bbc489e
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsConsumer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.config_maps;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kubernetes.AbstractKubernetesEndpoint;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.consumer.common.ConfigMapEvent;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapList;
+import io.fabric8.kubernetes.api.model.DoneableConfigMap;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.Resource;
+
+public class KubernetesConfigMapsConsumer extends DefaultConsumer {
+
+ private final Processor processor;
+ private ExecutorService executor;
+ private ConfigMapsConsumerTask configMapWatcher;
+
+ public KubernetesConfigMapsConsumer(AbstractKubernetesEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ this.processor = processor;
+ }
+
+ @Override
+ public AbstractKubernetesEndpoint getEndpoint() {
+ return (AbstractKubernetesEndpoint)super.getEndpoint();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ executor = getEndpoint().createExecutor();
+
+ configMapWatcher = new ConfigMapsConsumerTask();
+ executor.submit(configMapWatcher);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+
+ log.debug("Stopping Kubernetes ConfigMap Consumer");
+ if (executor != null) {
+ if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
+ if (configMapWatcher != null) {
+ configMapWatcher.getWatch().close();
+ }
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+ } else {
+ if (configMapWatcher != null) {
+ configMapWatcher.getWatch().close();
+ }
+ executor.shutdownNow();
+ }
+ }
+ executor = null;
+ }
+
+ class ConfigMapsConsumerTask implements Runnable {
+
+ private Watch watch;
+
+ @Override
+ public void run() {
+ NonNamespaceOperation<ConfigMap, ConfigMapList, DoneableConfigMap, Resource<ConfigMap, DoneableConfigMap>> w = getEndpoint().getKubernetesClient().configMaps();
+ if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelKey())
+ && ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelValue())) {
+ w.withLabel(getEndpoint().getKubernetesConfiguration().getLabelKey(), getEndpoint().getKubernetesConfiguration().getLabelValue());
+ }
+ if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getResourceName())) {
+ w.withName(getEndpoint().getKubernetesConfiguration().getResourceName());
+ }
+ watch = w.watch(new Watcher<ConfigMap>() {
+
+ @Override
+ public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, ConfigMap resource) {
+ ConfigMapEvent de = new ConfigMapEvent(action, resource);
+ Exchange exchange = getEndpoint().createExchange();
+ exchange.getIn().setBody(de.getConfigMap());
+ exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, de.getAction());
+ exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error during processing", exchange, e);
+ }
+ }
+
+ @Override
+ public void onClose(KubernetesClientException cause) {
+ if (cause != null) {
+ log.error(cause.getMessage(), cause);
+ }
+
+ }
+ });
+ }
+
+ public Watch getWatch() {
+ return watch;
+ }
+
+ public void setWatch(Watch watch) {
+ this.watch = watch;
+ }
+ }
+}
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsEndpoint.java
index 616dbfc..26dc3c9 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsEndpoint.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/config_maps/KubernetesConfigMapsEndpoint.java
@@ -43,7 +43,7 @@ public class KubernetesConfigMapsEndpoint extends AbstractKubernetesEndpoint {
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- throw new IllegalArgumentException("The kubernetes-configmaps doesn't support consumer");
+ return new KubernetesConfigMapsConsumer(this, processor);
}
}
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ConfigMapEvent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ConfigMapEvent.java
new file mode 100644
index 0000000..d225219
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/ConfigMapEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.consumer.common;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.client.Watcher.Action;
+
+public class ConfigMapEvent {
+ private io.fabric8.kubernetes.client.Watcher.Action action;
+
+ private ConfigMap configMap;
+
+ public ConfigMapEvent(Action action, ConfigMap configMap) {
+ this.action = action;
+ this.configMap = configMap;
+ }
+
+ public io.fabric8.kubernetes.client.Watcher.Action getAction() {
+ return action;
+ }
+
+ public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) {
+ this.action = action;
+ }
+
+ public ConfigMap getConfigMap() {
+ return configMap;
+ }
+
+ public void setConfigMap(ConfigMap configMap) {
+ this.configMap = configMap;
+ }
+}