You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/07/11 08:48:33 UTC
[2/4] camel git commit: CAMEL-10134: Camel-Kubernetes: Add the
ability to consume events from Resources filtered by labels and/or name -
Namespaces consumer
CAMEL-10134: Camel-Kubernetes: Add the ability to consume events from Resources filtered by labels and/or name - Namespaces consumer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4ac33c04
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4ac33c04
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4ac33c04
Branch: refs/heads/master
Commit: 4ac33c046f3dd8518260360a3562e0e9558019f4
Parents: dc129c2
Author: Andrea Cosentino <an...@gmail.com>
Authored: Mon Jul 11 09:42:48 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Jul 11 09:42:48 2016 +0200
----------------------------------------------------------------------
.../consumer/KubernetesNamespacesConsumer.java | 86 +++++++-------------
1 file changed, 31 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/4ac33c04/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
index 4786ede..9e09105 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
@@ -18,9 +18,13 @@ package org.apache.camel.component.kubernetes.consumer;
import java.util.concurrent.ExecutorService;
+import io.fabric8.kubernetes.api.model.DoneableNamespace;
import io.fabric8.kubernetes.api.model.Namespace;
+import io.fabric8.kubernetes.api.model.NamespaceList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.dsl.ClientNonNamespaceOperation;
+import io.fabric8.kubernetes.client.dsl.ClientResource;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -76,63 +80,35 @@ public class KubernetesNamespacesConsumer extends DefaultConsumer {
@Override
public void run() {
- if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) {
- if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
- getEndpoint().getKubernetesClient().namespaces()
- .withName(getEndpoint().getKubernetesConfiguration().getNamespace())
- .watch(new Watcher<Namespace>() {
-
- @Override
- public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
- Namespace resource) {
- NamespaceEvent ne = new NamespaceEvent(action, resource);
- Exchange exchange = getEndpoint().createExchange();
- exchange.getIn().setBody(ne.getNamespace());
- exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, ne.getAction());
- exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
- try {
- processor.process(exchange);
- } catch (Exception e) {
- getExceptionHandler().handleException("Error during processing", exchange, e);
- }
- }
-
- @Override
- public void onClose(KubernetesClientException cause) {
- if (cause != null) {
- LOG.error(cause.getMessage(), cause);
- }
- }
- });
- } else {
- getEndpoint().getKubernetesClient().namespaces().watch(new Watcher<Namespace>() {
+ ClientNonNamespaceOperation<Namespace, NamespaceList, DoneableNamespace, ClientResource<Namespace, DoneableNamespace>> w = getEndpoint().getKubernetesClient().namespaces();
+ if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
+ w.withName(getEndpoint().getKubernetesConfiguration().getNamespace());
+ }
+ w.watch(new Watcher<Namespace>() {
- @Override
- public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
- Namespace resource) {
- NamespaceEvent ne = new NamespaceEvent(action, resource);
- Exchange exchange = getEndpoint().createExchange();
- exchange.getIn().setBody(ne.getNamespace());
- exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, ne.getAction());
- exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
- try {
- processor.process(exchange);
- } catch (Exception e) {
- getExceptionHandler().handleException("Error during processing", exchange, e);
- }
-
- }
+ @Override
+ public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+ Namespace resource) {
+ NamespaceEvent ne = new NamespaceEvent(action, resource);
+ Exchange exchange = getEndpoint().createExchange();
+ exchange.getIn().setBody(ne.getNamespace());
+ exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, ne.getAction());
+ exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error during processing", exchange, e);
+ }
+ }
- @Override
- public void onClose(KubernetesClientException cause) {
- if (cause != null) {
- LOG.error(cause.getMessage(), cause);
- }
- }
- });
+ @Override
+ public void onClose(KubernetesClientException cause) {
+ if (cause != null) {
+ LOG.error(cause.getMessage(), cause);
+ }
}
- }
- }
+ });
+ }
}
-
}
+