You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/11/29 11:31:37 UTC

camel git commit: CAMEL-9376: Camel-Kubernetes: add a Namespace-based consumer

Repository: camel
Updated Branches:
  refs/heads/master 98d4f6af3 -> 5d305baa8


CAMEL-9376: Camel-Kubernetes: add a Namespace-based consumer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5d305baa
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5d305baa
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5d305baa

Branch: refs/heads/master
Commit: 5d305baa81c46429d97d354408a994c8becebebc
Parents: 98d4f6a
Author: Andrea Cosentino <an...@gmail.com>
Authored: Sun Nov 29 11:25:32 2015 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sun Nov 29 11:25:32 2015 +0100

----------------------------------------------------------------------
 .../kubernetes/KubernetesEndpoint.java          |   4 +
 .../consumer/KubernetesNamespacesConsumer.java  | 120 +++++++++++++++
 .../consumer/common/NamespaceEvent.java         |  48 ++++++
 .../KubernetesNamespacesConsumerTest.java       | 146 +++++++++++++++++++
 4 files changed, 318 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5d305baa/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
index 8e1f638..2f45522 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
@@ -23,6 +23,7 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.component.kubernetes.consumer.KubernetesNamespacesConsumer;
 import org.apache.camel.component.kubernetes.consumer.KubernetesPodsConsumer;
 import org.apache.camel.component.kubernetes.consumer.KubernetesReplicationControllersConsumer;
 import org.apache.camel.component.kubernetes.consumer.KubernetesSecretsConsumer;
@@ -132,6 +133,9 @@ public class KubernetesEndpoint extends DefaultEndpoint {
 
             case KubernetesCategory.SECRETS:
                 return new KubernetesSecretsConsumer(this, processor);
+                
+            case KubernetesCategory.NAMESPACES:
+                return new KubernetesNamespacesConsumer(this, processor);
 
             default:
                 throw new IllegalArgumentException("The " + category + " consumer category doesn't exist");

http://git-wip-us.apache.org/repos/asf/camel/blob/5d305baa/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
new file mode 100644
index 0000000..ffb6665
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.consumer;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import io.fabric8.kubernetes.api.model.Namespace;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.KubernetesEndpoint;
+import org.apache.camel.component.kubernetes.consumer.common.NamespaceEvent;
+import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesNamespacesConsumer extends ScheduledPollConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesNamespacesConsumer.class);
+
+    private ConcurrentMap<Long, NamespaceEvent> map;
+
+    public KubernetesNamespacesConsumer(KubernetesEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    public KubernetesEndpoint getEndpoint() {
+        return (KubernetesEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        map = new ConcurrentHashMap<Long, NamespaceEvent>();
+
+        if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) {
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) {
+                getEndpoint().getKubernetesClient().namespaces()
+                        .withName(getEndpoint().getKubernetesConfiguration().getNamespaceName())
+                        .watch(new Watcher<Namespace>() {
+
+                            @Override
+                            public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                                    Namespace resource) {
+                                NamespaceEvent ne = new NamespaceEvent(action, resource);
+                                map.put(System.currentTimeMillis(), ne);
+                                
+                            }
+
+                            @Override
+                            public void onClose(KubernetesClientException cause) {
+                                if (cause != null) {
+                                    LOG.error(cause.getMessage(), cause);
+                                }                            
+                            }
+                        });
+            } else {
+                getEndpoint().getKubernetesClient().namespaces().watch(new Watcher<Namespace>() {
+
+                    @Override
+                    public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                            Namespace resource) {
+                        NamespaceEvent ne = new NamespaceEvent(action, resource);
+                        map.put(System.currentTimeMillis(), ne);
+                        
+                    }
+
+                    @Override
+                    public void onClose(KubernetesClientException cause) {
+                        if (cause != null) {
+                            LOG.error(cause.getMessage(), cause);
+                        }                            
+                    }
+                });
+            }
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        map.clear();
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        int mapSize = map.size();
+        for (ConcurrentMap.Entry<Long, NamespaceEvent> entry : map.entrySet()) {
+            NamespaceEvent namespaceEvent = (NamespaceEvent) entry.getValue();
+            Exchange e = getEndpoint().createExchange();
+            e.getIn().setBody(namespaceEvent.getNamespace());
+            e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, namespaceEvent.getAction());
+            e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey());
+            getProcessor().process(e);
+            map.remove(entry.getKey());
+        }
+        return mapSize;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5d305baa/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NamespaceEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NamespaceEvent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NamespaceEvent.java
new file mode 100644
index 0000000..d8e1dab
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NamespaceEvent.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.consumer.common;
+
+import io.fabric8.kubernetes.api.model.Namespace;
+import io.fabric8.kubernetes.client.Watcher.Action;
+
+public class NamespaceEvent {
+    private io.fabric8.kubernetes.client.Watcher.Action action;
+
+    private Namespace namespace;
+
+    public NamespaceEvent(Action action, Namespace namespace) {
+        super();
+        this.action = action;
+        this.namespace = namespace;
+    }
+
+    public io.fabric8.kubernetes.client.Watcher.Action getAction() {
+        return action;
+    }
+
+    public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) {
+        this.action = action;
+    }
+
+    public Namespace getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(Namespace namespace) {
+        this.namespace = namespace;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5d305baa/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java
new file mode 100644
index 0000000..fb75241
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.consumer;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.Namespace;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.KubernetesTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.junit.Test;
+
+public class KubernetesNamespacesConsumerTest extends KubernetesTestSupport {
+
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint mockResultEndpoint;
+
+    @Test
+    public void createAndDeletePod() throws Exception {
+        if (ObjectHelper.isEmpty(authToken)) {
+            return;
+        }
+
+        mockResultEndpoint.expectedMessageCount(3);
+        mockResultEndpoint.expectedHeaderValuesReceivedInAnyOrder(KubernetesConstants.KUBERNETES_EVENT_ACTION, "ADDED",
+                "MODIFIED", "DELETED");
+        
+        Exchange ex = template.request("direct:createNamespace",
+                new Processor() {
+
+                    @Override
+                    public void process(Exchange exchange) throws Exception {
+                        exchange.getIn().setHeader(
+                                KubernetesConstants.KUBERNETES_NAMESPACE_NAME,
+                                "test");
+                        Map<String, String> labels = new HashMap<String, String>();
+                        labels.put("this", "rocks");
+                        exchange.getIn()
+                                .setHeader(
+                                        KubernetesConstants.KUBERNETES_NAMESPACE_LABELS,
+                                        labels);
+                    }
+                });
+
+        Namespace ns = ex.getOut().getBody(Namespace.class);
+
+        assertEquals(ns.getMetadata().getName(), "test");
+
+        ex = template.request("direct:listByLabels", new Processor() {
+
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                Map<String, String> labels = new HashMap<String, String>();
+                labels.put("this", "rocks");
+                exchange.getIn()
+                        .setHeader(
+                                KubernetesConstants.KUBERNETES_NAMESPACE_LABELS,
+                                labels);
+            }
+        });
+
+        List<Namespace> result = ex.getOut().getBody(List.class);
+
+        boolean testExists = false;
+
+        Iterator<Namespace> it = result.iterator();
+        while (it.hasNext()) {
+            Namespace namespace = (Namespace) it.next();
+            if ("test".equalsIgnoreCase(namespace.getMetadata().getName())) {
+                testExists = true;
+            }
+        }
+
+        assertTrue(testExists);
+
+        ex = template.request("direct:deleteNamespace", new Processor() {
+
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(
+                        KubernetesConstants.KUBERNETES_NAMESPACE_NAME, "test");
+            }
+        });
+
+        boolean nsDeleted = ex.getOut().getBody(Boolean.class);
+
+        assertTrue(nsDeleted);
+
+        mockResultEndpoint.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:list").toF("kubernetes://%s?oauthToken=%s&category=namespaces&operation=listNamespaces",
+                        host, authToken);
+                from("direct:listByLabels").toF(
+                        "kubernetes://%s?oauthToken=%s&category=namespaces&operation=listNamespacesByLabels", host,
+                        authToken);
+                from("direct:getNs").toF("kubernetes://%s?oauthToken=%s&category=namespaces&operation=getNamespace",
+                        host, authToken);
+                from("direct:createNamespace").toF(
+                        "kubernetes://%s?oauthToken=%s&category=namespaces&operation=createNamespace", host, authToken);
+                from("direct:deleteNamespace").toF(
+                        "kubernetes://%s?oauthToken=%s&category=namespaces&operation=deleteNamespace", host, authToken);
+                fromF("kubernetes://%s?oauthToken=%s&category=namespaces", host, authToken).process(
+                        new KubernertesProcessor()).to(mockResultEndpoint);
+            }
+        };
+    }
+
+    public class KubernertesProcessor implements Processor {
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            Message in = exchange.getIn();
+            log.info("Got event with body: " + in.getBody() + " and action "
+                    + in.getHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION));
+        }
+    }
+}