You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/26 10:56:18 UTC
[2/4] camel git commit: CAMEL-9683: A new toService EIP that uses a
client discovery to lookup alive services and pick a service ip/port to use
when calling the service from Camel route. Allows to plugin different
providers.
CAMEL-9683: A new toService EIP that uses a client discovery to lookup alive services and pick a service ip/port to use when calling the service from Camel route. Allows to plugin different providers.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ce5886fe
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ce5886fe
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ce5886fe
Branch: refs/heads/kube-lb
Commit: ce5886fe6773088f735f9b1757c59ba5191c8f7d
Parents: ea4e10e
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 26 10:03:08 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 26 10:03:08 2016 +0200
----------------------------------------------------------------------
.../processor/KubernetesProcessorFactory.java | 7 +-
.../KubernetesServiceCallProcessor.java | 181 ++++++++++++++++++
.../processor/KubernetesServiceDiscovery.java | 3 +
.../processor/KubernetesServiceProcessor.java | 187 -------------------
.../processor/RandomLoadBalancer.java | 31 +++
.../processor/ServiceCallLoadBalancer.java | 25 +++
6 files changed, 244 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ce5886fe/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
index 2cc103d..9596bc5 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
@@ -30,6 +30,9 @@ import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.IntrospectionSupport;
+/**
+ * {@link ProcessorFactory} that creates the Kubernetes implementation of the ServiceCall EIP.
+ */
public class KubernetesProcessorFactory implements ProcessorFactory {
@Override
@@ -71,14 +74,12 @@ public class KubernetesProcessorFactory implements ProcessorFactory {
KubernetesConfiguration kc = new KubernetesConfiguration();
IntrospectionSupport.setProperties(kc, parameters);
- // TODO: allow to specify kubernetes/openshift etc
-
// use namespace from config if not provided
if (namespace == null) {
namespace = kc.getNamespace();
}
- return new KubernetesServiceProcessor(name, namespace, uri, mep, kc);
+ return new KubernetesServiceCallProcessor(name, namespace, uri, mep, kc);
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/ce5886fe/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
new file mode 100644
index 0000000..48e4c7c
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import io.fabric8.openshift.client.DefaultOpenShiftClient;
+import io.fabric8.openshift.client.OpenShiftClient;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Traceable;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kubernetes based implementation of the the ServiceCall EIP.
+ */
+public class KubernetesServiceCallProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceCallProcessor.class);
+
+ private String id;
+ private final String name;
+ private final String namespace;
+ private final String uri;
+ private final ExchangePattern exchangePattern;
+ private final KubernetesConfiguration configuration;
+
+ private KubernetesServiceDiscovery discovery;
+ private ServiceCallLoadBalancer loadBalancer = new RandomLoadBalancer();
+
+ // TODO: allow to plugin custom load balancer like ribbon
+
+ public KubernetesServiceCallProcessor(String name, String namespace, String uri, ExchangePattern exchangePattern, KubernetesConfiguration configuration) {
+ this.name = name;
+ this.namespace = namespace;
+ this.uri = uri;
+ this.exchangePattern = exchangePattern;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ List<Server> servers = null;
+ try {
+ servers = discovery.getUpdatedListOfServers();
+ if (servers == null || servers.isEmpty()) {
+ exchange.setException(new RejectedExecutionException("No active services with name " + name + " in namespace " + namespace));
+ }
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+
+ if (exchange.getException() != null) {
+ callback.done(true);
+ return true;
+ }
+
+ // let the client load balancer chose which server to use
+ Server server = loadBalancer.choseServer(servers);
+ String ip = server.getIp();
+ int port = server.getPort();
+ LOG.debug("Random selected service {} active at server: {}:{}", name, ip, port);
+
+ // build uri based on the name
+
+ // TODO build uri
+ callback.done(true);
+ return true;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getTraceLabel() {
+ return "kubernetes";
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notEmpty(name, "name", this);
+ ObjectHelper.notEmpty(namespace, "namespace", this);
+ ObjectHelper.notEmpty(configuration.getMasterUrl(), "masterUrl", this);
+
+ discovery = new KubernetesServiceDiscovery(name, namespace, null, createKubernetesClient());
+ ServiceHelper.startService(discovery);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(discovery);
+ }
+
+ private OpenShiftClient createKubernetesClient() {
+ // TODO: need to use OpenShiftClient until fabric8-client can auto detect OS vs Kube environment
+ LOG.debug("Create Kubernetes client with the following Configuration: " + configuration.toString());
+
+ ConfigBuilder builder = new ConfigBuilder();
+ builder.withMasterUrl(configuration.getMasterUrl());
+ if ((ObjectHelper.isNotEmpty(configuration.getUsername())
+ && ObjectHelper.isNotEmpty(configuration.getPassword()))
+ && ObjectHelper.isEmpty(configuration.getOauthToken())) {
+ builder.withUsername(configuration.getUsername());
+ builder.withPassword(configuration.getPassword());
+ } else {
+ builder.withOauthToken(configuration.getOauthToken());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) {
+ builder.withCaCertData(configuration.getCaCertData());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) {
+ builder.withCaCertFile(configuration.getCaCertFile());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) {
+ builder.withClientCertData(configuration.getClientCertData());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) {
+ builder.withClientCertFile(configuration.getClientCertFile());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) {
+ builder.withApiVersion(configuration.getApiVersion());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) {
+ builder.withClientKeyAlgo(configuration.getClientKeyAlgo());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) {
+ builder.withClientKeyData(configuration.getClientKeyData());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) {
+ builder.withClientKeyFile(configuration.getClientKeyFile());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) {
+ builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) {
+ builder.withTrustCerts(configuration.getTrustCerts());
+ }
+
+ Config conf = builder.build();
+ return new DefaultOpenShiftClient(conf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ce5886fe/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
index ce870c9..81e1fca 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
@@ -30,6 +30,9 @@ import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Discovers services in Kubernetes.
+ */
public class KubernetesServiceDiscovery extends ServiceSupport {
private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceDiscovery.class);
http://git-wip-us.apache.org/repos/asf/camel/blob/ce5886fe/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java
deleted file mode 100644
index a07c047..0000000
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.processor;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.RejectedExecutionException;
-
-import io.fabric8.kubernetes.client.Config;
-import io.fabric8.kubernetes.client.ConfigBuilder;
-import io.fabric8.openshift.client.DefaultOpenShiftClient;
-import io.fabric8.openshift.client.OpenShiftClient;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Traceable;
-import org.apache.camel.component.kubernetes.KubernetesConfiguration;
-import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.AsyncProcessorHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KubernetesServiceProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
-
- private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceProcessor.class);
-
- private String id;
- private final String name;
- private final String namespace;
- private final String uri;
- private final ExchangePattern exchangePattern;
- private final KubernetesConfiguration configuration;
-
- private KubernetesServiceDiscovery discovery;
-
- // TODO: allow to plugin custom load balancer like ribbon
-
- public KubernetesServiceProcessor(String name, String namespace, String uri, ExchangePattern exchangePattern, KubernetesConfiguration configuration) {
- this.name = name;
- this.namespace = namespace;
- this.uri = uri;
- this.exchangePattern = exchangePattern;
- this.configuration = configuration;
- }
-
- @Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- @Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
- // TODO: in try .. catch and the callback stuff
-
- List<Server> services = null;
- try {
- services = discovery.getUpdatedListOfServers();
- if (services == null || services.isEmpty()) {
- exchange.setException(new RejectedExecutionException("No active services with name " + name + " in namespace " + namespace));
- }
- } catch (Throwable e) {
- exchange.setException(e);
- }
-
- if (exchange.getException() != null) {
- callback.done(true);
- return true;
- }
-
- // what strategy to use? random
- int size = services.size();
- int ran = new Random().nextInt(size);
- Server server = services.get(ran);
-
- String ip = server.getIp();
- int port = server.getPort();
-
- LOG.debug("Random selected service {} active at: {}:{}", name, ip, port);
-
- // build uri based on the name
-
-
- // TODO: lookup service
- // TODO: apply LB strategy
- // TODO build uri
- callback.done(true);
- return true;
- }
-
- @Override
- public String getId() {
- return id;
- }
-
- @Override
- public void setId(String id) {
- this.id = id;
- }
-
- @Override
- public String getTraceLabel() {
- return "kubernetes";
- }
-
- @Override
- protected void doStart() throws Exception {
- ObjectHelper.notEmpty(name, "name", this);
- ObjectHelper.notEmpty(namespace, "namespace", this);
- ObjectHelper.notEmpty(configuration.getMasterUrl(), "masterUrl", this);
-
- discovery = new KubernetesServiceDiscovery(name, namespace, null, createKubernetesClient());
- ServiceHelper.startService(discovery);
- }
-
- @Override
- protected void doStop() throws Exception {
- ServiceHelper.stopService(discovery);
- }
-
- private OpenShiftClient createKubernetesClient() {
- // TODO: need to use OpenShiftClient until fabric8-client can auto detect OS vs Kube environment
- LOG.debug("Create Kubernetes client with the following Configuration: " + configuration.toString());
-
- ConfigBuilder builder = new ConfigBuilder();
- builder.withMasterUrl(configuration.getMasterUrl());
- if ((ObjectHelper.isNotEmpty(configuration.getUsername())
- && ObjectHelper.isNotEmpty(configuration.getPassword()))
- && ObjectHelper.isEmpty(configuration.getOauthToken())) {
- builder.withUsername(configuration.getUsername());
- builder.withPassword(configuration.getPassword());
- } else {
- builder.withOauthToken(configuration.getOauthToken());
- }
- if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) {
- builder.withCaCertData(configuration.getCaCertData());
- }
- if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) {
- builder.withCaCertFile(configuration.getCaCertFile());
- }
- if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) {
- builder.withClientCertData(configuration.getClientCertData());
- }
- if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) {
- builder.withClientCertFile(configuration.getClientCertFile());
- }
- if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) {
- builder.withApiVersion(configuration.getApiVersion());
- }
- if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) {
- builder.withClientKeyAlgo(configuration.getClientKeyAlgo());
- }
- if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) {
- builder.withClientKeyData(configuration.getClientKeyData());
- }
- if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) {
- builder.withClientKeyFile(configuration.getClientKeyFile());
- }
- if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) {
- builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase());
- }
- if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) {
- builder.withTrustCerts(configuration.getTrustCerts());
- }
-
- Config conf = builder.build();
- return new DefaultOpenShiftClient(conf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/ce5886fe/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java
new file mode 100644
index 0000000..aa35384
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import java.util.List;
+import java.util.Random;
+
+public class RandomLoadBalancer implements ServiceCallLoadBalancer {
+
+ @Override
+ public Server choseServer(List<Server> services) {
+ int size = services.size();
+ int ran = new Random().nextInt(size);
+ Server server = services.get(ran);
+ return server;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ce5886fe/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallLoadBalancer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallLoadBalancer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallLoadBalancer.java
new file mode 100644
index 0000000..28b5e66
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallLoadBalancer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import java.util.List;
+
+public interface ServiceCallLoadBalancer {
+
+ Server choseServer(List<Server> services);
+
+}