You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/11/29 11:31:37 UTC
camel git commit: CAMEL-9376: Camel-Kubernetes: add a Namespace-based
consumer
Repository: camel
Updated Branches:
refs/heads/master 98d4f6af3 -> 5d305baa8
CAMEL-9376: Camel-Kubernetes: add a Namespace-based consumer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5d305baa
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5d305baa
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5d305baa
Branch: refs/heads/master
Commit: 5d305baa81c46429d97d354408a994c8becebebc
Parents: 98d4f6a
Author: Andrea Cosentino <an...@gmail.com>
Authored: Sun Nov 29 11:25:32 2015 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sun Nov 29 11:25:32 2015 +0100
----------------------------------------------------------------------
.../kubernetes/KubernetesEndpoint.java | 4 +
.../consumer/KubernetesNamespacesConsumer.java | 120 +++++++++++++++
.../consumer/common/NamespaceEvent.java | 48 ++++++
.../KubernetesNamespacesConsumerTest.java | 146 +++++++++++++++++++
4 files changed, 318 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5d305baa/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
index 8e1f638..2f45522 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
@@ -23,6 +23,7 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.component.kubernetes.consumer.KubernetesNamespacesConsumer;
import org.apache.camel.component.kubernetes.consumer.KubernetesPodsConsumer;
import org.apache.camel.component.kubernetes.consumer.KubernetesReplicationControllersConsumer;
import org.apache.camel.component.kubernetes.consumer.KubernetesSecretsConsumer;
@@ -132,6 +133,9 @@ public class KubernetesEndpoint extends DefaultEndpoint {
case KubernetesCategory.SECRETS:
return new KubernetesSecretsConsumer(this, processor);
+
+ case KubernetesCategory.NAMESPACES:
+ return new KubernetesNamespacesConsumer(this, processor);
default:
throw new IllegalArgumentException("The " + category + " consumer category doesn't exist");
http://git-wip-us.apache.org/repos/asf/camel/blob/5d305baa/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
new file mode 100644
index 0000000..ffb6665
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.consumer;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import io.fabric8.kubernetes.api.model.Namespace;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.KubernetesEndpoint;
+import org.apache.camel.component.kubernetes.consumer.common.NamespaceEvent;
+import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesNamespacesConsumer extends ScheduledPollConsumer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesNamespacesConsumer.class);
+
+ private ConcurrentMap<Long, NamespaceEvent> map;
+
+ public KubernetesNamespacesConsumer(KubernetesEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ }
+
+ @Override
+ public KubernetesEndpoint getEndpoint() {
+ return (KubernetesEndpoint) super.getEndpoint();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ map = new ConcurrentHashMap<Long, NamespaceEvent>();
+
+ if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) {
+ if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) {
+ getEndpoint().getKubernetesClient().namespaces()
+ .withName(getEndpoint().getKubernetesConfiguration().getNamespaceName())
+ .watch(new Watcher<Namespace>() {
+
+ @Override
+ public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+ Namespace resource) {
+ NamespaceEvent ne = new NamespaceEvent(action, resource);
+ map.put(System.currentTimeMillis(), ne);
+
+ }
+
+ @Override
+ public void onClose(KubernetesClientException cause) {
+ if (cause != null) {
+ LOG.error(cause.getMessage(), cause);
+ }
+ }
+ });
+ } else {
+ getEndpoint().getKubernetesClient().namespaces().watch(new Watcher<Namespace>() {
+
+ @Override
+ public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+ Namespace resource) {
+ NamespaceEvent ne = new NamespaceEvent(action, resource);
+ map.put(System.currentTimeMillis(), ne);
+
+ }
+
+ @Override
+ public void onClose(KubernetesClientException cause) {
+ if (cause != null) {
+ LOG.error(cause.getMessage(), cause);
+ }
+ }
+ });
+ }
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ map.clear();
+ }
+
+ @Override
+ protected int poll() throws Exception {
+ int mapSize = map.size();
+ for (ConcurrentMap.Entry<Long, NamespaceEvent> entry : map.entrySet()) {
+ NamespaceEvent namespaceEvent = (NamespaceEvent) entry.getValue();
+ Exchange e = getEndpoint().createExchange();
+ e.getIn().setBody(namespaceEvent.getNamespace());
+ e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, namespaceEvent.getAction());
+ e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey());
+ getProcessor().process(e);
+ map.remove(entry.getKey());
+ }
+ return mapSize;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d305baa/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NamespaceEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NamespaceEvent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NamespaceEvent.java
new file mode 100644
index 0000000..d8e1dab
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NamespaceEvent.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.consumer.common;
+
+import io.fabric8.kubernetes.api.model.Namespace;
+import io.fabric8.kubernetes.client.Watcher.Action;
+
+public class NamespaceEvent {
+ private io.fabric8.kubernetes.client.Watcher.Action action;
+
+ private Namespace namespace;
+
+ public NamespaceEvent(Action action, Namespace namespace) {
+ super();
+ this.action = action;
+ this.namespace = namespace;
+ }
+
+ public io.fabric8.kubernetes.client.Watcher.Action getAction() {
+ return action;
+ }
+
+ public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) {
+ this.action = action;
+ }
+
+ public Namespace getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(Namespace namespace) {
+ this.namespace = namespace;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d305baa/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java
new file mode 100644
index 0000000..fb75241
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.consumer;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.Namespace;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.component.kubernetes.KubernetesTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.junit.Test;
+
+public class KubernetesNamespacesConsumerTest extends KubernetesTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint mockResultEndpoint;
+
+ @Test
+ public void createAndDeletePod() throws Exception {
+ if (ObjectHelper.isEmpty(authToken)) {
+ return;
+ }
+
+ mockResultEndpoint.expectedMessageCount(3);
+ mockResultEndpoint.expectedHeaderValuesReceivedInAnyOrder(KubernetesConstants.KUBERNETES_EVENT_ACTION, "ADDED",
+ "MODIFIED", "DELETED");
+
+ Exchange ex = template.request("direct:createNamespace",
+ new Processor() {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(
+ KubernetesConstants.KUBERNETES_NAMESPACE_NAME,
+ "test");
+ Map<String, String> labels = new HashMap<String, String>();
+ labels.put("this", "rocks");
+ exchange.getIn()
+ .setHeader(
+ KubernetesConstants.KUBERNETES_NAMESPACE_LABELS,
+ labels);
+ }
+ });
+
+ Namespace ns = ex.getOut().getBody(Namespace.class);
+
+ assertEquals(ns.getMetadata().getName(), "test");
+
+ ex = template.request("direct:listByLabels", new Processor() {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Map<String, String> labels = new HashMap<String, String>();
+ labels.put("this", "rocks");
+ exchange.getIn()
+ .setHeader(
+ KubernetesConstants.KUBERNETES_NAMESPACE_LABELS,
+ labels);
+ }
+ });
+
+ List<Namespace> result = ex.getOut().getBody(List.class);
+
+ boolean testExists = false;
+
+ Iterator<Namespace> it = result.iterator();
+ while (it.hasNext()) {
+ Namespace namespace = (Namespace) it.next();
+ if ("test".equalsIgnoreCase(namespace.getMetadata().getName())) {
+ testExists = true;
+ }
+ }
+
+ assertTrue(testExists);
+
+ ex = template.request("direct:deleteNamespace", new Processor() {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(
+ KubernetesConstants.KUBERNETES_NAMESPACE_NAME, "test");
+ }
+ });
+
+ boolean nsDeleted = ex.getOut().getBody(Boolean.class);
+
+ assertTrue(nsDeleted);
+
+ mockResultEndpoint.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:list").toF("kubernetes://%s?oauthToken=%s&category=namespaces&operation=listNamespaces",
+ host, authToken);
+ from("direct:listByLabels").toF(
+ "kubernetes://%s?oauthToken=%s&category=namespaces&operation=listNamespacesByLabels", host,
+ authToken);
+ from("direct:getNs").toF("kubernetes://%s?oauthToken=%s&category=namespaces&operation=getNamespace",
+ host, authToken);
+ from("direct:createNamespace").toF(
+ "kubernetes://%s?oauthToken=%s&category=namespaces&operation=createNamespace", host, authToken);
+ from("direct:deleteNamespace").toF(
+ "kubernetes://%s?oauthToken=%s&category=namespaces&operation=deleteNamespace", host, authToken);
+ fromF("kubernetes://%s?oauthToken=%s&category=namespaces", host, authToken).process(
+ new KubernertesProcessor()).to(mockResultEndpoint);
+ }
+ };
+ }
+
+ public class KubernertesProcessor implements Processor {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Message in = exchange.getIn();
+ log.info("Got event with body: " + in.getBody() + " and action "
+ + in.getHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION));
+ }
+ }
+}