You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/07/11 08:48:33 UTC

[2/4] camel git commit: CAMEL-10134: Camel-Kubernetes: Add the ability to consume events from Resources filtered by labels and/or name - Namespaces consumer

CAMEL-10134: Camel-Kubernetes: Add the ability to consume events from Resources filtered by labels and/or name - Namespaces consumer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4ac33c04
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4ac33c04
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4ac33c04

Branch: refs/heads/master
Commit: 4ac33c046f3dd8518260360a3562e0e9558019f4
Parents: dc129c2
Author: Andrea Cosentino <an...@gmail.com>
Authored: Mon Jul 11 09:42:48 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Jul 11 09:42:48 2016 +0200

----------------------------------------------------------------------
 .../consumer/KubernetesNamespacesConsumer.java  | 86 +++++++-------------
 1 file changed, 31 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4ac33c04/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
index 4786ede..9e09105 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
@@ -18,9 +18,13 @@ package org.apache.camel.component.kubernetes.consumer;
 
 import java.util.concurrent.ExecutorService;
 
+import io.fabric8.kubernetes.api.model.DoneableNamespace;
 import io.fabric8.kubernetes.api.model.Namespace;
+import io.fabric8.kubernetes.api.model.NamespaceList;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.dsl.ClientNonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.ClientResource;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -76,63 +80,35 @@ public class KubernetesNamespacesConsumer extends DefaultConsumer {
         
         @Override
         public void run() {
-            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) {
-                if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
-                    getEndpoint().getKubernetesClient().namespaces()
-                            .withName(getEndpoint().getKubernetesConfiguration().getNamespace())
-                            .watch(new Watcher<Namespace>() {
-
-                                @Override
-                                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
-                                        Namespace resource) {
-                                    NamespaceEvent ne = new NamespaceEvent(action, resource);
-                                    Exchange exchange = getEndpoint().createExchange();
-                                    exchange.getIn().setBody(ne.getNamespace());
-                                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, ne.getAction());
-                                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
-                                    try {
-                                        processor.process(exchange);
-                                    } catch (Exception e) {
-                                        getExceptionHandler().handleException("Error during processing", exchange, e);
-                                    }                                   
-                                }
-
-                                @Override
-                                public void onClose(KubernetesClientException cause) {
-                                    if (cause != null) {
-                                        LOG.error(cause.getMessage(), cause);
-                                    }                            
-                                }
-                            });
-                } else {
-                    getEndpoint().getKubernetesClient().namespaces().watch(new Watcher<Namespace>() {
+            ClientNonNamespaceOperation<Namespace, NamespaceList, DoneableNamespace, ClientResource<Namespace, DoneableNamespace>> w = getEndpoint().getKubernetesClient().namespaces();
+            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
+                w.withName(getEndpoint().getKubernetesConfiguration().getNamespace());
+            }
+            w.watch(new Watcher<Namespace>() {
 
-                        @Override
-                        public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
-                                Namespace resource) {
-                            NamespaceEvent ne = new NamespaceEvent(action, resource);
-                            Exchange exchange = getEndpoint().createExchange();
-                            exchange.getIn().setBody(ne.getNamespace());
-                            exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, ne.getAction());
-                            exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
-                            try {
-                                processor.process(exchange);
-                            } catch (Exception e) {
-                                getExceptionHandler().handleException("Error during processing", exchange, e);
-                            }      
-                            
-                        }
+                @Override
+                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                    Namespace resource) {
+                    NamespaceEvent ne = new NamespaceEvent(action, resource);
+                    Exchange exchange = getEndpoint().createExchange();
+                    exchange.getIn().setBody(ne.getNamespace());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, ne.getAction());
+                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
+                    try {
+                        processor.process(exchange);
+                    } catch (Exception e) {
+                        getExceptionHandler().handleException("Error during processing", exchange, e);
+                    }                                   
+                }
 
-                        @Override
-                        public void onClose(KubernetesClientException cause) {
-                            if (cause != null) {
-                                LOG.error(cause.getMessage(), cause);
-                            }                            
-                        }
-                    });
+                @Override
+                public void onClose(KubernetesClientException cause) {
+                    if (cause != null) {
+                        LOG.error(cause.getMessage(), cause);
+                    }                            
                 }
-            }
-        }
+            });
+        } 
     }
-
 }
+