You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/07/11 08:48:34 UTC

[3/4] camel git commit: CAMEL-10134: Camel-Kubernetes: Add the ability to consume events from Resources filtered by labels and/or name - Replication Controller and Service Consumers

CAMEL-10134: Camel-Kubernetes: Add the ability to consume events from Resources filtered by labels and/or name - Replication Controller and Service Consumers


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bef891e2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bef891e2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bef891e2

Branch: refs/heads/master
Commit: bef891e25b041df3a81ea72a8adfa5907d4654f2
Parents: 4ac33c0
Author: Andrea Cosentino <an...@gmail.com>
Authored: Mon Jul 11 10:01:40 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Jul 11 10:01:40 2016 +0200

----------------------------------------------------------------------
 ...ubernetesReplicationControllersConsumer.java | 104 ++++++++-----------
 .../consumer/KubernetesServicesConsumer.java    |  98 ++++++++---------
 ...netesReplicationControllersConsumerTest.java |   3 +-
 .../KubernetesServicesConsumerTest.java         |   2 +-
 4 files changed, 88 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bef891e2/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
index 03f9be3..76c8347 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
@@ -18,9 +18,17 @@ package org.apache.camel.component.kubernetes.consumer;
 
 import java.util.concurrent.ExecutorService;
 
+import io.fabric8.kubernetes.api.model.DoneablePod;
+import io.fabric8.kubernetes.api.model.DoneableReplicationController;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
 import io.fabric8.kubernetes.api.model.ReplicationController;
+import io.fabric8.kubernetes.api.model.ReplicationControllerList;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.dsl.ClientMixedOperation;
+import io.fabric8.kubernetes.client.dsl.ClientPodResource;
+import io.fabric8.kubernetes.client.dsl.ClientRollableScallableResource;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -77,66 +85,44 @@ public class KubernetesReplicationControllersConsumer extends DefaultConsumer {
          
         @Override
         public void run() {
-            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) {
-                if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
-                    getEndpoint().getKubernetesClient().replicationControllers()
-                            .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace())
-                            .watch(new Watcher<ReplicationController>() {
-
-                                @Override
-                                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
-                                        ReplicationController resource) {
-                                    ReplicationControllerEvent rce = new ReplicationControllerEvent(action, resource);
-                                    Exchange exchange = getEndpoint().createExchange();
-                                    exchange.getIn().setBody(rce.getReplicationController());
-                                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, rce.getAction());
-                                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
-                                    try {
-                                        processor.process(exchange);
-                                    } catch (Exception e) {
-                                        getExceptionHandler().handleException("Error during processing", exchange, e);
-                                    }
-
-                                }
-
-                                @Override
-                                public void onClose(KubernetesClientException cause) {
-                                    if (cause != null) {
-                                        LOG.error(cause.getMessage(), cause);
-                                    }
-                                }
-
-                            });
-                } else {
-                    getEndpoint().getKubernetesClient().replicationControllers()
-                            .watch(new Watcher<ReplicationController>() {
-
-                                @Override
-                                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
-                                        ReplicationController resource) {
-                                    ReplicationControllerEvent se = new ReplicationControllerEvent(action, resource);
-                                    ReplicationControllerEvent rce = new ReplicationControllerEvent(action, resource);
-                                    Exchange exchange = getEndpoint().createExchange();
-                                    exchange.getIn().setBody(rce.getReplicationController());
-                                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, rce.getAction());
-                                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
-                                    try {
-                                        processor.process(exchange);
-                                    } catch (Exception e) {
-                                        getExceptionHandler().handleException("Error during processing", exchange, e);
-                                    }
-                                }
-
-                                @Override
-                                public void onClose(KubernetesClientException cause) {
-                                    if (cause != null) {
-                                        LOG.error(cause.getMessage(), cause);
-                                    }
-                                }
-                            });
-                }
+            ClientMixedOperation<ReplicationController, ReplicationControllerList, DoneableReplicationController, 
+                ClientRollableScallableResource<ReplicationController, DoneableReplicationController>> w = getEndpoint().getKubernetesClient().replicationControllers();
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
+                w.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace());
+            }
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelKey()) 
+                && ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelValue())) {
+                w.withLabel(getEndpoint().getKubernetesConfiguration().getLabelKey(), getEndpoint().getKubernetesConfiguration().getLabelValue());
+            }
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getResourceName())) {
+                w.withName(getEndpoint().getKubernetesConfiguration().getResourceName());
             }
+            w.watch(new Watcher<ReplicationController>() {
+
+                @Override
+                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                    ReplicationController resource) {
+                    ReplicationControllerEvent rce = new ReplicationControllerEvent(action, resource);
+                    Exchange exchange = getEndpoint().createExchange();
+                    exchange.getIn().setBody(rce.getReplicationController());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, rce.getAction());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
+                    try {
+                        processor.process(exchange);
+                    } catch (Exception e) {
+                        getExceptionHandler().handleException("Error during processing", exchange, e);
+                    }
+
+                }
+
+                @Override
+                public void onClose(KubernetesClientException cause) {
+                    if (cause != null) {
+                        LOG.error(cause.getMessage(), cause);
+                    }
+                }
+
+            });
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/bef891e2/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
index b51746f..2d658bb 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
@@ -18,9 +18,13 @@ package org.apache.camel.component.kubernetes.consumer;
 
 import java.util.concurrent.ExecutorService;
 
+import io.fabric8.kubernetes.api.model.DoneableService;
 import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceList;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.dsl.ClientMixedOperation;
+import io.fabric8.kubernetes.client.dsl.ClientResource;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -76,63 +80,43 @@ public class KubernetesServicesConsumer extends DefaultConsumer {
         
         @Override
         public void run() {
-            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) {
-                if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
-                    getEndpoint().getKubernetesClient().services()
-                            .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace())
-                            .watch(new Watcher<Service>() {
-
-                                @Override
-                                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
-                                        Service resource) {
-                                    ServiceEvent se = new ServiceEvent(action, resource);
-                                    Exchange exchange = getEndpoint().createExchange();
-                                    exchange.getIn().setBody(se.getService());
-                                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, se.getAction());
-                                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
-                                    try {
-                                        processor.process(exchange);
-                                    } catch (Exception e) {
-                                        getExceptionHandler().handleException("Error during processing", exchange, e);
-                                    }
-
-                                }
-
-                                @Override
-                                public void onClose(KubernetesClientException cause) {
-                                    if (cause != null) {
-                                        LOG.error(cause.getMessage(), cause);
-                                    }
-                                }
-
-                            });
-                } else {
-                    getEndpoint().getKubernetesClient().services().watch(new Watcher<Service>() {
-
-                        @Override
-                        public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Service resource) {
-                            ServiceEvent se = new ServiceEvent(action, resource);
-                            Exchange exchange = getEndpoint().createExchange();
-                            exchange.getIn().setBody(se.getService());
-                            exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, se.getAction());
-                            exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
-                            try {
-                                processor.process(exchange);
-                            } catch (Exception e) {
-                                getExceptionHandler().handleException("Error during processing", exchange, e);
-                            }
-                        }
-
-                        @Override
-                        public void onClose(KubernetesClientException cause) {
-                            if (cause != null) {
-                                LOG.error(cause.getMessage(), cause);
-                            }
-                        }
-                    });
-                }
+            ClientMixedOperation<Service, ServiceList, DoneableService, ClientResource<Service, DoneableService>> w = getEndpoint().getKubernetesClient().services();
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
+                w.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace());
             }
-        }
-    }
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelKey()) 
+                && ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelValue())) {
+                w.withLabel(getEndpoint().getKubernetesConfiguration().getLabelKey(), getEndpoint().getKubernetesConfiguration().getLabelValue());
+            }
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getResourceName())) {
+                w.withName(getEndpoint().getKubernetesConfiguration().getResourceName());
+            }
+            w.watch(new Watcher<Service>() {
+
+                @Override
+                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                    Service resource) {
+                    ServiceEvent se = new ServiceEvent(action, resource);
+                    Exchange exchange = getEndpoint().createExchange();
+                    exchange.getIn().setBody(se.getService());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, se.getAction());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
+                    try {
+                        processor.process(exchange);
+                    } catch (Exception e) {
+                        getExceptionHandler().handleException("Error during processing", exchange, e);
+                    }
 
+                }
+
+                @Override
+                public void onClose(KubernetesClientException cause) {
+                    if (cause != null) {
+                        LOG.error(cause.getMessage(), cause);
+                    }
+                }
+
+            });
+        } 
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/bef891e2/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
index e8835ec..5cb5d15 100644
--- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
@@ -17,7 +17,6 @@
 package org.apache.camel.component.kubernetes.consumer;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 import io.fabric8.kubernetes.api.model.PodTemplateSpec;
@@ -115,7 +114,7 @@ public class KubernetesReplicationControllersConsumerTest extends KubernetesTest
                 from("direct:deleteReplicationController").toF(
                         "kubernetes://%s?oauthToken=%s&category=replicationControllers&operation=deleteReplicationController",
                         host, authToken);
-                fromF("kubernetes://%s?oauthToken=%s&category=replicationControllers", host, authToken)
+                fromF("kubernetes://%s?oauthToken=%s&category=replicationControllers&resourceName=wildfly", host, authToken)
                         .process(new KubernertesProcessor()).to(mockResultEndpoint);
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/bef891e2/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
index 61f57fb..a4ada62 100644
--- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
@@ -113,7 +113,7 @@ public class KubernetesServicesConsumerTest extends KubernetesTestSupport {
                         "kubernetes://%s?oauthToken=%s&category=services&operation=createService", host, authToken);
                 from("direct:deleteService").toF(
                         "kubernetes://%s?oauthToken=%s&category=services&operation=deleteService", host, authToken);
-                fromF("kubernetes://%s?oauthToken=%s&category=services", host, authToken)
+                fromF("kubernetes://%s?oauthToken=%s&category=services&labelKey=this&labelValue=rocks", host, authToken)
                         .process(new KubernertesProcessor()).to(mockResultEndpoint);
             }
         };