You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/26 10:56:18 UTC

[2/4] camel git commit: CAMEL-9683: A new toService EIP that uses a client discovery to lookup alive services and pick a service ip/port to use when calling the service from Camel route. Allows to plugin different providers.

CAMEL-9683: A new toService EIP that uses a client discovery to lookup alive services and pick a service ip/port to use when calling the service from Camel route. Allows to plugin different providers.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ce5886fe
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ce5886fe
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ce5886fe

Branch: refs/heads/kube-lb
Commit: ce5886fe6773088f735f9b1757c59ba5191c8f7d
Parents: ea4e10e
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Apr 26 10:03:08 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Apr 26 10:03:08 2016 +0200

----------------------------------------------------------------------
 .../processor/KubernetesProcessorFactory.java   |   7 +-
 .../KubernetesServiceCallProcessor.java         | 181 ++++++++++++++++++
 .../processor/KubernetesServiceDiscovery.java   |   3 +
 .../processor/KubernetesServiceProcessor.java   | 187 -------------------
 .../processor/RandomLoadBalancer.java           |  31 +++
 .../processor/ServiceCallLoadBalancer.java      |  25 +++
 6 files changed, 244 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ce5886fe/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
index 2cc103d..9596bc5 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
@@ -30,6 +30,9 @@ import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.IntrospectionSupport;
 
+/**
+ * {@link ProcessorFactory} that creates the Kubernetes implementation of the ServiceCall EIP.
+ */
 public class KubernetesProcessorFactory implements ProcessorFactory {
 
     @Override
@@ -71,14 +74,12 @@ public class KubernetesProcessorFactory implements ProcessorFactory {
             KubernetesConfiguration kc = new KubernetesConfiguration();
             IntrospectionSupport.setProperties(kc, parameters);
 
-            // TODO: allow to specify kubernetes/openshift etc
-
             // use namespace from config if not provided
             if (namespace == null) {
                 namespace = kc.getNamespace();
             }
 
-            return new KubernetesServiceProcessor(name, namespace, uri, mep, kc);
+            return new KubernetesServiceCallProcessor(name, namespace, uri, mep, kc);
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/ce5886fe/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
new file mode 100644
index 0000000..48e4c7c
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceCallProcessor.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import io.fabric8.openshift.client.DefaultOpenShiftClient;
+import io.fabric8.openshift.client.OpenShiftClient;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Traceable;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kubernetes based implementation of the the ServiceCall EIP.
+ */
+public class KubernetesServiceCallProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceCallProcessor.class);
+
+    private String id;
+    private final String name;
+    private final String namespace;
+    private final String uri;
+    private final ExchangePattern exchangePattern;
+    private final KubernetesConfiguration configuration;
+
+    private KubernetesServiceDiscovery discovery;
+    private ServiceCallLoadBalancer loadBalancer = new RandomLoadBalancer();
+
+    // TODO: allow to plugin custom load balancer like ribbon
+
+    public KubernetesServiceCallProcessor(String name, String namespace, String uri, ExchangePattern exchangePattern, KubernetesConfiguration configuration) {
+        this.name = name;
+        this.namespace = namespace;
+        this.uri = uri;
+        this.exchangePattern = exchangePattern;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        List<Server> servers = null;
+        try {
+            servers = discovery.getUpdatedListOfServers();
+            if (servers == null || servers.isEmpty()) {
+                exchange.setException(new RejectedExecutionException("No active services with name " + name + " in namespace " + namespace));
+            }
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+
+        if (exchange.getException() != null) {
+            callback.done(true);
+            return true;
+        }
+
+        // let the client load balancer chose which server to use
+        Server server = loadBalancer.choseServer(servers);
+        String ip = server.getIp();
+        int port = server.getPort();
+        LOG.debug("Random selected service {} active at server: {}:{}", name, ip, port);
+
+        // build uri based on the name
+
+        // TODO build uri
+        callback.done(true);
+        return true;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String getTraceLabel() {
+        return "kubernetes";
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notEmpty(name, "name", this);
+        ObjectHelper.notEmpty(namespace, "namespace", this);
+        ObjectHelper.notEmpty(configuration.getMasterUrl(), "masterUrl", this);
+
+        discovery = new KubernetesServiceDiscovery(name, namespace, null, createKubernetesClient());
+        ServiceHelper.startService(discovery);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(discovery);
+    }
+
+    private OpenShiftClient createKubernetesClient() {
+        // TODO: need to use OpenShiftClient until fabric8-client can auto detect OS vs Kube environment
+        LOG.debug("Create Kubernetes client with the following Configuration: " + configuration.toString());
+
+        ConfigBuilder builder = new ConfigBuilder();
+        builder.withMasterUrl(configuration.getMasterUrl());
+        if ((ObjectHelper.isNotEmpty(configuration.getUsername())
+                && ObjectHelper.isNotEmpty(configuration.getPassword()))
+                && ObjectHelper.isEmpty(configuration.getOauthToken())) {
+            builder.withUsername(configuration.getUsername());
+            builder.withPassword(configuration.getPassword());
+        } else {
+            builder.withOauthToken(configuration.getOauthToken());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) {
+            builder.withCaCertData(configuration.getCaCertData());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) {
+            builder.withCaCertFile(configuration.getCaCertFile());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) {
+            builder.withClientCertData(configuration.getClientCertData());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) {
+            builder.withClientCertFile(configuration.getClientCertFile());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) {
+            builder.withApiVersion(configuration.getApiVersion());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) {
+            builder.withClientKeyAlgo(configuration.getClientKeyAlgo());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) {
+            builder.withClientKeyData(configuration.getClientKeyData());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) {
+            builder.withClientKeyFile(configuration.getClientKeyFile());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) {
+            builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) {
+            builder.withTrustCerts(configuration.getTrustCerts());
+        }
+
+        Config conf = builder.build();
+        return new DefaultOpenShiftClient(conf);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce5886fe/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
index ce870c9..81e1fca 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceDiscovery.java
@@ -30,6 +30,9 @@ import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Discovers services in Kubernetes.
+ */
 public class KubernetesServiceDiscovery extends ServiceSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceDiscovery.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/ce5886fe/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java
deleted file mode 100644
index a07c047..0000000
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesServiceProcessor.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.processor;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.RejectedExecutionException;
-
-import io.fabric8.kubernetes.client.Config;
-import io.fabric8.kubernetes.client.ConfigBuilder;
-import io.fabric8.openshift.client.DefaultOpenShiftClient;
-import io.fabric8.openshift.client.OpenShiftClient;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Traceable;
-import org.apache.camel.component.kubernetes.KubernetesConfiguration;
-import org.apache.camel.spi.IdAware;
-import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.AsyncProcessorHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KubernetesServiceProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KubernetesServiceProcessor.class);
-
-    private String id;
-    private final String name;
-    private final String namespace;
-    private final String uri;
-    private final ExchangePattern exchangePattern;
-    private final KubernetesConfiguration configuration;
-
-    private KubernetesServiceDiscovery discovery;
-
-    // TODO: allow to plugin custom load balancer like ribbon
-
-    public KubernetesServiceProcessor(String name, String namespace, String uri, ExchangePattern exchangePattern, KubernetesConfiguration configuration) {
-        this.name = name;
-        this.namespace = namespace;
-        this.uri = uri;
-        this.exchangePattern = exchangePattern;
-        this.configuration = configuration;
-    }
-
-    @Override
-    public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-    @Override
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        // TODO: in try .. catch and the callback stuff
-
-        List<Server> services = null;
-        try {
-            services = discovery.getUpdatedListOfServers();
-            if (services == null || services.isEmpty()) {
-                exchange.setException(new RejectedExecutionException("No active services with name " + name + " in namespace " + namespace));
-            }
-        } catch (Throwable e) {
-            exchange.setException(e);
-        }
-
-        if (exchange.getException() != null) {
-            callback.done(true);
-            return true;
-        }
-
-        // what strategy to use? random
-        int size = services.size();
-        int ran = new Random().nextInt(size);
-        Server server = services.get(ran);
-
-        String ip = server.getIp();
-        int port = server.getPort();
-
-        LOG.debug("Random selected service {} active at: {}:{}", name, ip, port);
-
-        // build uri based on the name
-
-
-        // TODO: lookup service
-        // TODO: apply LB strategy
-        // TODO build uri
-        callback.done(true);
-        return true;
-    }
-
-    @Override
-    public String getId() {
-        return id;
-    }
-
-    @Override
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    @Override
-    public String getTraceLabel() {
-        return "kubernetes";
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        ObjectHelper.notEmpty(name, "name", this);
-        ObjectHelper.notEmpty(namespace, "namespace", this);
-        ObjectHelper.notEmpty(configuration.getMasterUrl(), "masterUrl", this);
-
-        discovery = new KubernetesServiceDiscovery(name, namespace, null, createKubernetesClient());
-        ServiceHelper.startService(discovery);
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        ServiceHelper.stopService(discovery);
-    }
-
-    private OpenShiftClient createKubernetesClient() {
-        // TODO: need to use OpenShiftClient until fabric8-client can auto detect OS vs Kube environment
-        LOG.debug("Create Kubernetes client with the following Configuration: " + configuration.toString());
-
-        ConfigBuilder builder = new ConfigBuilder();
-        builder.withMasterUrl(configuration.getMasterUrl());
-        if ((ObjectHelper.isNotEmpty(configuration.getUsername())
-                && ObjectHelper.isNotEmpty(configuration.getPassword()))
-                && ObjectHelper.isEmpty(configuration.getOauthToken())) {
-            builder.withUsername(configuration.getUsername());
-            builder.withPassword(configuration.getPassword());
-        } else {
-            builder.withOauthToken(configuration.getOauthToken());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) {
-            builder.withCaCertData(configuration.getCaCertData());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) {
-            builder.withCaCertFile(configuration.getCaCertFile());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) {
-            builder.withClientCertData(configuration.getClientCertData());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) {
-            builder.withClientCertFile(configuration.getClientCertFile());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) {
-            builder.withApiVersion(configuration.getApiVersion());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) {
-            builder.withClientKeyAlgo(configuration.getClientKeyAlgo());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) {
-            builder.withClientKeyData(configuration.getClientKeyData());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) {
-            builder.withClientKeyFile(configuration.getClientKeyFile());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) {
-            builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) {
-            builder.withTrustCerts(configuration.getTrustCerts());
-        }
-
-        Config conf = builder.build();
-        return new DefaultOpenShiftClient(conf);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce5886fe/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java
new file mode 100644
index 0000000..aa35384
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/RandomLoadBalancer.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import java.util.List;
+import java.util.Random;
+
+public class RandomLoadBalancer implements ServiceCallLoadBalancer {
+
+    @Override
+    public Server choseServer(List<Server> services) {
+        int size = services.size();
+        int ran = new Random().nextInt(size);
+        Server server = services.get(ran);
+        return server;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce5886fe/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallLoadBalancer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallLoadBalancer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallLoadBalancer.java
new file mode 100644
index 0000000..28b5e66
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/ServiceCallLoadBalancer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import java.util.List;
+
+public interface ServiceCallLoadBalancer {
+
+    Server choseServer(List<Server> services);
+
+}