You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/07/11 08:48:34 UTC
[3/4] camel git commit: CAMEL-10134: Camel-Kubernetes: Add the
ability to consume events from Resources filtered by labels and/or name -
Replication Controller and Service Consumers
CAMEL-10134: Camel-Kubernetes: Add the ability to consume events from Resources filtered by labels and/or name - Replication Controller and Service Consumers
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bef891e2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bef891e2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bef891e2
Branch: refs/heads/master
Commit: bef891e25b041df3a81ea72a8adfa5907d4654f2
Parents: 4ac33c0
Author: Andrea Cosentino <an...@gmail.com>
Authored: Mon Jul 11 10:01:40 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Jul 11 10:01:40 2016 +0200
----------------------------------------------------------------------
...ubernetesReplicationControllersConsumer.java | 104 ++++++++-----------
.../consumer/KubernetesServicesConsumer.java | 98 ++++++++---------
...netesReplicationControllersConsumerTest.java | 3 +-
.../KubernetesServicesConsumerTest.java | 2 +-
4 files changed, 88 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/bef891e2/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
index 03f9be3..76c8347 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
@@ -18,9 +18,17 @@ package org.apache.camel.component.kubernetes.consumer;
import java.util.concurrent.ExecutorService;
+import io.fabric8.kubernetes.api.model.DoneablePod;
+import io.fabric8.kubernetes.api.model.DoneableReplicationController;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.ReplicationController;
+import io.fabric8.kubernetes.api.model.ReplicationControllerList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.dsl.ClientMixedOperation;
+import io.fabric8.kubernetes.client.dsl.ClientPodResource;
+import io.fabric8.kubernetes.client.dsl.ClientRollableScallableResource;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -77,66 +85,44 @@ public class KubernetesReplicationControllersConsumer extends DefaultConsumer {
@Override
public void run() {
- if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) {
- if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
- getEndpoint().getKubernetesClient().replicationControllers()
- .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace())
- .watch(new Watcher<ReplicationController>() {
-
- @Override
- public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
- ReplicationController resource) {
- ReplicationControllerEvent rce = new ReplicationControllerEvent(action, resource);
- Exchange exchange = getEndpoint().createExchange();
- exchange.getIn().setBody(rce.getReplicationController());
- exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, rce.getAction());
- exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
- try {
- processor.process(exchange);
- } catch (Exception e) {
- getExceptionHandler().handleException("Error during processing", exchange, e);
- }
-
- }
-
- @Override
- public void onClose(KubernetesClientException cause) {
- if (cause != null) {
- LOG.error(cause.getMessage(), cause);
- }
- }
-
- });
- } else {
- getEndpoint().getKubernetesClient().replicationControllers()
- .watch(new Watcher<ReplicationController>() {
-
- @Override
- public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
- ReplicationController resource) {
- ReplicationControllerEvent se = new ReplicationControllerEvent(action, resource);
- ReplicationControllerEvent rce = new ReplicationControllerEvent(action, resource);
- Exchange exchange = getEndpoint().createExchange();
- exchange.getIn().setBody(rce.getReplicationController());
- exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, rce.getAction());
- exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
- try {
- processor.process(exchange);
- } catch (Exception e) {
- getExceptionHandler().handleException("Error during processing", exchange, e);
- }
- }
-
- @Override
- public void onClose(KubernetesClientException cause) {
- if (cause != null) {
- LOG.error(cause.getMessage(), cause);
- }
- }
- });
- }
+ ClientMixedOperation<ReplicationController, ReplicationControllerList, DoneableReplicationController,
+ ClientRollableScallableResource<ReplicationController, DoneableReplicationController>> w = getEndpoint().getKubernetesClient().replicationControllers();
+ if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
+ w.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace());
+ }
+ if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelKey())
+ && ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelValue())) {
+ w.withLabel(getEndpoint().getKubernetesConfiguration().getLabelKey(), getEndpoint().getKubernetesConfiguration().getLabelValue());
+ }
+ if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getResourceName())) {
+ w.withName(getEndpoint().getKubernetesConfiguration().getResourceName());
}
+ w.watch(new Watcher<ReplicationController>() {
+
+ @Override
+ public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+ ReplicationController resource) {
+ ReplicationControllerEvent rce = new ReplicationControllerEvent(action, resource);
+ Exchange exchange = getEndpoint().createExchange();
+ exchange.getIn().setBody(rce.getReplicationController());
+ exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, rce.getAction());
+ exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error during processing", exchange, e);
+ }
+
+ }
+
+ @Override
+ public void onClose(KubernetesClientException cause) {
+ if (cause != null) {
+ LOG.error(cause.getMessage(), cause);
+ }
+ }
+
+ });
}
}
-
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bef891e2/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
index b51746f..2d658bb 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
@@ -18,9 +18,13 @@ package org.apache.camel.component.kubernetes.consumer;
import java.util.concurrent.ExecutorService;
+import io.fabric8.kubernetes.api.model.DoneableService;
import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.dsl.ClientMixedOperation;
+import io.fabric8.kubernetes.client.dsl.ClientResource;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -76,63 +80,43 @@ public class KubernetesServicesConsumer extends DefaultConsumer {
@Override
public void run() {
- if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) {
- if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
- getEndpoint().getKubernetesClient().services()
- .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace())
- .watch(new Watcher<Service>() {
-
- @Override
- public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
- Service resource) {
- ServiceEvent se = new ServiceEvent(action, resource);
- Exchange exchange = getEndpoint().createExchange();
- exchange.getIn().setBody(se.getService());
- exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, se.getAction());
- exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
- try {
- processor.process(exchange);
- } catch (Exception e) {
- getExceptionHandler().handleException("Error during processing", exchange, e);
- }
-
- }
-
- @Override
- public void onClose(KubernetesClientException cause) {
- if (cause != null) {
- LOG.error(cause.getMessage(), cause);
- }
- }
-
- });
- } else {
- getEndpoint().getKubernetesClient().services().watch(new Watcher<Service>() {
-
- @Override
- public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Service resource) {
- ServiceEvent se = new ServiceEvent(action, resource);
- Exchange exchange = getEndpoint().createExchange();
- exchange.getIn().setBody(se.getService());
- exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, se.getAction());
- exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
- try {
- processor.process(exchange);
- } catch (Exception e) {
- getExceptionHandler().handleException("Error during processing", exchange, e);
- }
- }
-
- @Override
- public void onClose(KubernetesClientException cause) {
- if (cause != null) {
- LOG.error(cause.getMessage(), cause);
- }
- }
- });
- }
+ ClientMixedOperation<Service, ServiceList, DoneableService, ClientResource<Service, DoneableService>> w = getEndpoint().getKubernetesClient().services();
+ if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
+ w.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace());
}
- }
- }
+ if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelKey())
+ && ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelValue())) {
+ w.withLabel(getEndpoint().getKubernetesConfiguration().getLabelKey(), getEndpoint().getKubernetesConfiguration().getLabelValue());
+ }
+ if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getResourceName())) {
+ w.withName(getEndpoint().getKubernetesConfiguration().getResourceName());
+ }
+ w.watch(new Watcher<Service>() {
+
+ @Override
+ public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+ Service resource) {
+ ServiceEvent se = new ServiceEvent(action, resource);
+ Exchange exchange = getEndpoint().createExchange();
+ exchange.getIn().setBody(se.getService());
+ exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, se.getAction());
+ exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error during processing", exchange, e);
+ }
+ }
+
+ @Override
+ public void onClose(KubernetesClientException cause) {
+ if (cause != null) {
+ LOG.error(cause.getMessage(), cause);
+ }
+ }
+
+ });
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bef891e2/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
index e8835ec..5cb5d15 100644
--- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
@@ -17,7 +17,6 @@
package org.apache.camel.component.kubernetes.consumer;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import io.fabric8.kubernetes.api.model.PodTemplateSpec;
@@ -115,7 +114,7 @@ public class KubernetesReplicationControllersConsumerTest extends KubernetesTest
from("direct:deleteReplicationController").toF(
"kubernetes://%s?oauthToken=%s&category=replicationControllers&operation=deleteReplicationController",
host, authToken);
- fromF("kubernetes://%s?oauthToken=%s&category=replicationControllers", host, authToken)
+ fromF("kubernetes://%s?oauthToken=%s&category=replicationControllers&resourceName=wildfly", host, authToken)
.process(new KubernertesProcessor()).to(mockResultEndpoint);
}
};
http://git-wip-us.apache.org/repos/asf/camel/blob/bef891e2/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
index 61f57fb..a4ada62 100644
--- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
@@ -113,7 +113,7 @@ public class KubernetesServicesConsumerTest extends KubernetesTestSupport {
"kubernetes://%s?oauthToken=%s&category=services&operation=createService", host, authToken);
from("direct:deleteService").toF(
"kubernetes://%s?oauthToken=%s&category=services&operation=deleteService", host, authToken);
- fromF("kubernetes://%s?oauthToken=%s&category=services", host, authToken)
+ fromF("kubernetes://%s?oauthToken=%s&category=services&labelKey=this&labelValue=rocks", host, authToken)
.process(new KubernertesProcessor()).to(mockResultEndpoint);
}
};