You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/16 12:45:14 UTC

camel git commit: CAMEL-9683: Started on camel-ribbon

Repository: camel
Updated Branches:
  refs/heads/kube-lb 8b7001d4c -> 32deef6e3


CAMEL-9683: Started on camel-ribbon


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/32deef6e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/32deef6e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/32deef6e

Branch: refs/heads/kube-lb
Commit: 32deef6e3b2ee25b80fcbd87e922919cce10a33e
Parents: 8b7001d
Author: Claus Ibsen <da...@apache.org>
Authored: Mon May 16 14:45:04 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 16 14:45:04 2016 +0200

----------------------------------------------------------------------
 .../ServiceCallConfigurationDefinition.java     | 30 +++++++
 .../processor/RibbonProcessorFactory.java       | 25 ++++++
 .../ribbon/processor/RibbonServer.java          |  1 +
 .../processor/RibbonServiceCallProcessor.java   | 35 ++++++--
 ...bbonServiceCallStaticServerListStrategy.java | 17 ++++
 .../RibbonServiceCallUpdateRouteTest.java       | 90 ++++++++++++++++++++
 .../SpringRibbonServiceCallRouteTest.xml        | 18 +---
 7 files changed, 196 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/32deef6e/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java
index 9b38565..4be884d 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ServiceCallConfigurationDefinition.java
@@ -16,9 +16,12 @@
  */
 package org.apache.camel.model;
 
+import java.util.ArrayList;
+import java.util.List;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
@@ -73,6 +76,8 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType {
     private String serverListStrategyRef;
     @XmlTransient
     private ServiceCallServerListStrategy serverListStrategy;
+    @XmlElement(name = "clientProperty") @Metadata(label = "advanced")
+    private List<PropertyDefinition> properties;
 
     public ServiceCallConfigurationDefinition() {
     }
@@ -245,6 +250,14 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType {
         this.serverListStrategy = serverListStrategy;
     }
 
+    public List<PropertyDefinition> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(List<PropertyDefinition> properties) {
+        this.properties = properties;
+    }
+
     // Fluent API
     // -------------------------------------------------------------------------
 
@@ -409,6 +422,23 @@ public class ServiceCallConfigurationDefinition extends IdentifiedType {
     }
 
     /**
+     * Adds a custom client property to use.
+     * <p/>
+     * These properties are specific to what service call implementation are in use. For example if using ribbon, then
+     * the client properties are define in com.netflix.client.config.CommonClientConfigKey.
+     */
+    public ServiceCallConfigurationDefinition clientProperty(String key, String value) {
+        if (properties == null) {
+            properties = new ArrayList<>();
+        }
+        PropertyDefinition prop = new PropertyDefinition();
+        prop.setKey(key);
+        prop.setValue(value);
+        properties.add(prop);
+        return this;
+    }
+
+    /**
      * End of configuration
      */
     public ProcessorDefinition end() {

http://git-wip-us.apache.org/repos/asf/camel/blob/32deef6e/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonProcessorFactory.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonProcessorFactory.java
index 5a3f685..84b6bb5 100644
--- a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonProcessorFactory.java
+++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonProcessorFactory.java
@@ -25,6 +25,7 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.component.ribbon.RibbonConfiguration;
 import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.PropertyDefinition;
 import org.apache.camel.model.ServiceCallConfigurationDefinition;
 import org.apache.camel.model.ServiceCallDefinition;
 import org.apache.camel.spi.ProcessorFactory;
@@ -124,15 +125,39 @@ public class RibbonProcessorFactory implements ProcessorFactory {
                 throw new IllegalArgumentException("Load balancer must be of type: " + IRule.class + " but is of type: " + lb.getClass().getName());
             }
 
+            Map<String, String> properties = configureProperties(routeContext, config, configRef);
+
             RibbonServiceCallProcessor processor = new RibbonServiceCallProcessor(name, namespace, uri, mep, rc);
             processor.setRule((IRule) lb);
             processor.setServerListStrategy(sl);
+            processor.setRibbonClientConfig(properties);
             return processor;
         } else {
             return null;
         }
     }
 
+    private Map<String, String> configureProperties(RouteContext routeContext, ServiceCallConfigurationDefinition config, ServiceCallConfigurationDefinition configRef) throws Exception {
+        Map<String, String> answer = new HashMap<>();
+        if (config != null && config.getProperties() != null) {
+            for (PropertyDefinition prop : config.getProperties()) {
+                // support property placeholders
+                String key = CamelContextHelper.parseText(routeContext.getCamelContext(), prop.getKey());
+                String value = CamelContextHelper.parseText(routeContext.getCamelContext(), prop.getValue());
+                answer.put(key, value);
+            }
+        }
+        if (configRef != null && configRef.getProperties() != null) {
+            for (PropertyDefinition prop : configRef.getProperties()) {
+                // support property placeholders
+                String key = CamelContextHelper.parseText(routeContext.getCamelContext(), prop.getKey());
+                String value = CamelContextHelper.parseText(routeContext.getCamelContext(), prop.getValue());
+                answer.put(key, value);
+            }
+        }
+        return answer;
+    }
+
     private Object configureLoadBalancer(RouteContext routeContext, ServiceCallDefinition sd) {
         Object lb = null;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/32deef6e/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServer.java
----------------------------------------------------------------------
diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServer.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServer.java
index 8a545a8..d4e5bbf 100644
--- a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServer.java
+++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServer.java
@@ -29,4 +29,5 @@ public class RibbonServer extends Server implements ServiceCallServer {
     public String getIp() {
         return getHost();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/32deef6e/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallProcessor.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallProcessor.java
index 5e77c51..290149a 100644
--- a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallProcessor.java
+++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallProcessor.java
@@ -16,13 +16,18 @@
  */
 package org.apache.camel.component.ribbon.processor;
 
+import java.util.Map;
 import java.util.concurrent.RejectedExecutionException;
 
+import com.netflix.client.config.IClientConfig;
+import com.netflix.client.config.IClientConfigKey;
+import com.netflix.loadbalancer.DummyPing;
 import com.netflix.loadbalancer.IRule;
-import com.netflix.loadbalancer.LoadBalancerBuilder;
+import com.netflix.loadbalancer.PollingServerListUpdater;
 import com.netflix.loadbalancer.RoundRobinRule;
 import com.netflix.loadbalancer.Server;
 import com.netflix.loadbalancer.ServerList;
+import com.netflix.loadbalancer.ServerListUpdater;
 import com.netflix.loadbalancer.ZoneAwareLoadBalancer;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -63,6 +68,7 @@ public class RibbonServiceCallProcessor extends ServiceSupport implements AsyncP
     private ZoneAwareLoadBalancer<RibbonServer> ribbonLoadBalancer;
     private IRule rule;
     private final RibbonServiceCallExpression serviceCallExpression;
+    private Map<String, String> ribbonClientConfig;
     private SendDynamicProcessor processor;
 
     public RibbonServiceCallProcessor(String name, String namespace, String uri, ExchangePattern exchangePattern, RibbonConfiguration configuration) {
@@ -169,6 +175,14 @@ public class RibbonServiceCallProcessor extends ServiceSupport implements AsyncP
         this.rule = rule;
     }
 
+    public Map<String, String> getRibbonClientConfig() {
+        return ribbonClientConfig;
+    }
+
+    public void setRibbonClientConfig(Map<String, String> ribbonClientConfig) {
+        this.ribbonClientConfig = ribbonClientConfig;
+    }
+
     @Override
     @SuppressWarnings("unchecked")
     protected void doStart() throws Exception {
@@ -187,12 +201,21 @@ public class RibbonServiceCallProcessor extends ServiceSupport implements AsyncP
             rule = new RoundRobinRule();
         }
 
-        ribbonLoadBalancer = LoadBalancerBuilder.<RibbonServer>newBuilder()
-                .withDynamicServerList((ServerList<RibbonServer>) serverListStrategy)
-                .withRule(rule)
-                .buildDynamicServerListLoadBalancer();
+        // setup client config
+        IClientConfig config = IClientConfig.Builder.newBuilder().build();
+        if (ribbonClientConfig != null) {
+            for (Map.Entry<String, String> entry : ribbonClientConfig.entrySet()) {
+                IClientConfigKey key = IClientConfigKey.Keys.valueOf(entry.getKey());
+                String value = entry.getValue();
+                LOG.debug("RibbonClientConfig: {}={}", key.key(), value);
+                config.set(key, entry.getValue());
+            }
+        }
+
+        ServerListUpdater updater = new PollingServerListUpdater(config);
+        ribbonLoadBalancer = new ZoneAwareLoadBalancer<>(config, rule, new DummyPing(), (ServerList<RibbonServer>) serverListStrategy, null, updater);
 
-        LOG.info("RibbonServiceCall at namespace: {} with service name: {} is using load balancer: {} and service discovery: {}", namespace, name, ribbonLoadBalancer, serverListStrategy);
+        LOG.info("RibbonServiceCall at namespace: {} with service name: {} is using load balancer: {} and server list: {}", namespace, name, ribbonLoadBalancer, serverListStrategy);
 
         processor = new SendDynamicProcessor(uri, serviceCallExpression);
         processor.setCamelContext(getCamelContext());

http://git-wip-us.apache.org/repos/asf/camel/blob/32deef6e/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java
index 5891360..a836900 100644
--- a/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java
+++ b/components/camel-ribbon/src/main/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallStaticServerListStrategy.java
@@ -23,12 +23,25 @@ import com.netflix.client.config.IClientConfig;
 import com.netflix.loadbalancer.AbstractServerList;
 import com.netflix.loadbalancer.ServerList;
 import org.apache.camel.spi.ServiceCallServerListStrategy;
+import org.apache.camel.util.ObjectHelper;
 
 public class RibbonServiceCallStaticServerListStrategy extends AbstractServerList<RibbonServer> implements ServerList<RibbonServer>, ServiceCallServerListStrategy<RibbonServer> {
 
     private IClientConfig clientConfig;
     private final List<RibbonServer> servers = new ArrayList<>();
 
+    public static RibbonServiceCallStaticServerListStrategy build(String servers) {
+        RibbonServiceCallStaticServerListStrategy answer = new RibbonServiceCallStaticServerListStrategy();
+        String[] parts = servers.split(",");
+        for (String part : parts) {
+            String host = ObjectHelper.before(part, ":");
+            String port = ObjectHelper.after(part, ":");
+            int num = Integer.valueOf(port);
+            answer.addServer(host, num);
+        }
+        return answer;
+    }
+
     public RibbonServiceCallStaticServerListStrategy() {
     }
 
@@ -44,6 +57,10 @@ public class RibbonServiceCallStaticServerListStrategy extends AbstractServerLis
         servers.add(new RibbonServer(host, port));
     }
 
+    public void removeServer(String host, int port) {
+        servers.remove(new RibbonServer(host, port));
+    }
+
     @Override
     public void initWithNiwsConfig(IClientConfig clientConfig) {
         this.clientConfig = clientConfig;

http://git-wip-us.apache.org/repos/asf/camel/blob/32deef6e/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallUpdateRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallUpdateRouteTest.java b/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallUpdateRouteTest.java
new file mode 100644
index 0000000..2ff22ac
--- /dev/null
+++ b/components/camel-ribbon/src/test/java/org/apache/camel/component/ribbon/processor/RibbonServiceCallUpdateRouteTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ribbon.processor;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class RibbonServiceCallUpdateRouteTest extends CamelTestSupport {
+
+    private final RibbonServiceCallStaticServerListStrategy servers = new RibbonServiceCallStaticServerListStrategy();
+
+    @Override
+    public void setUp() throws Exception {
+        // setup a static ribbon server list with these 2 servers to start with
+        servers.addServer("localhost", 9090);
+        servers.addServer("localhost", 9091);
+
+        super.setUp();
+    }
+
+    @Test
+    public void testServiceCall() throws Exception {
+        getMockEndpoint("mock:9090").expectedMessageCount(1);
+        getMockEndpoint("mock:9091").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(2);
+
+        String out = template.requestBody("direct:start", null, String.class);
+        String out2 = template.requestBody("direct:start", null, String.class);
+        assertEquals("9091", out);
+        assertEquals("9090", out2);
+
+        assertMockEndpointsSatisfied();
+
+        // stop the first server and remove it from the known list of servers
+        context.stopRoute("9090");
+        servers.removeServer("localhost", 9090);
+
+        // call the other active server
+        String out3 = template.requestBody("direct:start", null, String.class);
+        assertEquals("9091", out3);
+
+        // sleep a bit to make the server updated run and detect that a server is no longer in the list
+        log.debug("Sleeping to all the server list updated to run");
+        Thread.sleep(1000);
+        log.debug("Calling the service now");
+
+        // call again and it should call 9091 as its the only active server
+        String out4 = template.requestBody("direct:start", null, String.class);
+        assertEquals("9091", out4);
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .serviceCall().name("myService")
+                            // lets update quick so we do not have to sleep so much in the tests
+                            .serviceCallConfiguration().serverListStrategy(servers).clientProperty("ServerListRefreshInterval", "250").end()
+                        .to("mock:result");
+
+                from("jetty:http://localhost:9090").routeId("9090")
+                    .to("mock:9090")
+                    .transform().constant("9090");
+
+                from("jetty:http://localhost:9091").routeId("9091")
+                    .to("mock:9091")
+                    .transform().constant("9091");
+            }
+        };
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/32deef6e/components/camel-ribbon/src/test/resources/org/apache/camel/component/ribbon/processor/SpringRibbonServiceCallRouteTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-ribbon/src/test/resources/org/apache/camel/component/ribbon/processor/SpringRibbonServiceCallRouteTest.xml b/components/camel-ribbon/src/test/resources/org/apache/camel/component/ribbon/processor/SpringRibbonServiceCallRouteTest.xml
index a439f23..487fa0c 100644
--- a/components/camel-ribbon/src/test/resources/org/apache/camel/component/ribbon/processor/SpringRibbonServiceCallRouteTest.xml
+++ b/components/camel-ribbon/src/test/resources/org/apache/camel/component/ribbon/processor/SpringRibbonServiceCallRouteTest.xml
@@ -22,21 +22,11 @@
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     ">
 
-  <!-- TODO: should be nicer to do in XML-->
   <!-- setup a static ribbon server list with these 2 servers to start with -->
-  <bean id="servers" class="org.apache.camel.component.ribbon.processor.RibbonServiceCallStaticServerListStrategy">
-    <constructor-arg index="0">
-      <list>
-        <bean class="org.apache.camel.component.ribbon.processor.RibbonServer">
-          <constructor-arg index="0" value="localhost"/>
-          <constructor-arg index="1" value="9090"/>
-        </bean>
-        <bean class="org.apache.camel.component.ribbon.processor.RibbonServer">
-          <constructor-arg index="0" value="localhost"/>
-          <constructor-arg index="1" value="9091"/>
-        </bean>
-      </list>
-    </constructor-arg>
+  <!-- call the build method with the argument below -->
+  <bean id="servers" class="org.apache.camel.component.ribbon.processor.RibbonServiceCallStaticServerListStrategy"
+        factory-method="build">
+    <constructor-arg index="0" value="localhost:9090,localhost:9091"/>
   </bean>
 
   <camelContext xmlns="http://camel.apache.org/schema/spring">