You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/19 06:52:23 UTC

svn commit: r935452 - in /camel/trunk/components/camel-netty/src: main/java/org/apache/camel/component/netty/ test/java/org/apache/camel/component/netty/

Author: davsclaus
Date: Mon Apr 19 04:52:23 2010
New Revision: 935452

URL: http://svn.apache.org/viewvc?rev=935452&view=rev
Log:
CAMEL-2657: Multiple encoder and decoders now possible. Thanks to Stephen Gargan for the patch.

Added:
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java   (with props)
Modified:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=935452&r1=935451&r2=935452&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java Mon Apr 19 04:52:23 2010
@@ -16,14 +16,18 @@
  */
 package org.apache.camel.component.netty;
 
+import java.util.List;
+
 import javax.net.ssl.SSLEngine;
 
 import org.apache.camel.component.netty.handlers.ClientChannelHandler;
 import org.apache.camel.component.netty.ssl.SSLEngineFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.handler.ssl.SslHandler;
 
@@ -50,9 +54,17 @@ public class ClientPipelineFactory imple
             }
             channelPipeline.addLast("ssl", sslHandler);            
         }
-        
-        channelPipeline.addLast("decoder", producer.getConfiguration().getDecoder());
-        channelPipeline.addLast("encoder", producer.getConfiguration().getEncoder());
+
+        List<ChannelUpstreamHandler> decoders = producer.getConfiguration().getDecoders();
+        for (int x = 0; x < decoders.size(); x++) {
+            channelPipeline.addLast("decoder-" + x, decoders.get(x));
+        }
+
+        List<ChannelDownstreamHandler> encoders = producer.getConfiguration().getEncoders();
+        for (int x = 0; x < encoders.size(); x++) {
+            channelPipeline.addLast("encoder-" + x, encoders.get(x));
+        }
+
         if (producer.getConfiguration().getHandler() != null) {
             channelPipeline.addLast("handler", producer.getConfiguration().getHandler());
         } else {

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=935452&r1=935451&r2=935452&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Mon Apr 19 04:52:23 2010
@@ -18,6 +18,8 @@ package org.apache.camel.component.netty
 
 import java.io.File;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.util.URISupport;
@@ -29,6 +31,7 @@ import org.jboss.netty.handler.codec.ser
 import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
 import org.jboss.netty.handler.ssl.SslHandler;
 
+@SuppressWarnings("unchecked")
 public class NettyConfiguration {
     private String protocol;
     private String host;
@@ -44,8 +47,8 @@ public class NettyConfiguration {
     private File keyStoreFile;
     private File trustStoreFile;
     private SslHandler sslHandler;
-    private ChannelDownstreamHandler encoder;
-    private ChannelUpstreamHandler decoder;
+    private List<ChannelDownstreamHandler> encoders = new ArrayList<ChannelDownstreamHandler>();
+    private List<ChannelUpstreamHandler> decoders = new ArrayList<ChannelUpstreamHandler>();
     private ChannelHandler handler;
     private boolean ssl;
     private long sendBufferSize;
@@ -69,63 +72,72 @@ public class NettyConfiguration {
         setCorePoolSize(10);
         setMaxPoolSize(100);
     }
-    
+
     public void parseURI(URI uri, Map<String, Object> parameters, NettyComponent component) throws Exception {
         protocol = uri.getScheme();
-        
+
         if ((!protocol.equalsIgnoreCase("tcp")) && (!protocol.equalsIgnoreCase("udp"))) {
             throw new IllegalArgumentException("Unrecognized Netty protocol: " + protocol + " for uri: " + uri);
         }
-        
+
         setHost(uri.getHost());
         setPort(uri.getPort());
-     
+
         sslHandler = component.resolveAndRemoveReferenceParameter(parameters, "sslHandler", SslHandler.class, null);
         passphrase = component.resolveAndRemoveReferenceParameter(parameters, "passphrase", String.class, null);
-        keyStoreFormat = component.getAndRemoveParameter(parameters, "keyStoreFormat", String.class, "JKS");        
+        keyStoreFormat = component.getAndRemoveParameter(parameters, "keyStoreFormat", String.class, "JKS");
         securityProvider = component.getAndRemoveParameter(parameters, "securityProvider", String.class, "SunX509");
         keyStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "keyStoreFile", File.class, null);
         trustStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "trustStoreFile", File.class, null);
-        encoder = component.resolveAndRemoveReferenceParameter(parameters, "encoder", ChannelDownstreamHandler.class, new ObjectEncoder());
-        decoder = component.resolveAndRemoveReferenceParameter(parameters, "decoder", ChannelUpstreamHandler.class, new ObjectDecoder());
-        handler = component.resolveAndRemoveReferenceParameter(parameters, "handler", SimpleChannelHandler.class, null);        
-        
+
+        List<ChannelDownstreamHandler> referencedEncoders = component.resolveAndRemoveReferenceParameter(parameters, "encoders", List.class, null);
+        addToHandlersList(encoders, referencedEncoders, ChannelDownstreamHandler.class);
+        List<ChannelUpstreamHandler> referencedDecoders = component.resolveAndRemoveReferenceParameter(parameters, "decoders", List.class, null);
+        addToHandlersList(decoders, referencedDecoders, ChannelUpstreamHandler.class);
+
+        if (encoders.isEmpty() && decoders.isEmpty()) {
+            encoders.add(component.resolveAndRemoveReferenceParameter(parameters, "encoder", ChannelDownstreamHandler.class, new ObjectEncoder()));
+            decoders.add(component.resolveAndRemoveReferenceParameter(parameters, "decoder", ChannelUpstreamHandler.class, new ObjectDecoder()));
+        }
+
+        handler = component.resolveAndRemoveReferenceParameter(parameters, "handler", SimpleChannelHandler.class, null);
+
         Map<String, Object> settings = URISupport.parseParameters(uri);
         if (settings.containsKey("keepAlive")) {
             setKeepAlive(Boolean.valueOf((String) settings.get("keepAlive")));
-        }         
+        }
         if (settings.containsKey("tcpNoDelay")) {
             setTcpNoDelay(Boolean.valueOf((String) settings.get("tcpNoDelay")));
-        }        
+        }
         if (settings.containsKey("broadcast")) {
             setBroadcast(Boolean.valueOf((String) settings.get("broadcast")));
-        }            
+        }
         if (settings.containsKey("reuseAddress")) {
             setReuseAddress(Boolean.valueOf((String) settings.get("reuseAddress")));
         }
         if (settings.containsKey("connectTimeoutMillis")) {
-            setConnectTimeoutMillis(Long.valueOf((String)settings.get("connectTimeoutMillis")));
+            setConnectTimeoutMillis(Long.valueOf((String) settings.get("connectTimeoutMillis")));
         }
         if (settings.containsKey("sync")) {
             setTcpNoDelay(Boolean.valueOf((String) settings.get("sync")));
         }
         if (settings.containsKey("receiveTimeoutMillis")) {
-            setReceiveTimeoutMillis(Long.valueOf((String)settings.get("receiveTimeoutMillis")));
+            setReceiveTimeoutMillis(Long.valueOf((String) settings.get("receiveTimeoutMillis")));
         }
         if (settings.containsKey("sendBufferSize")) {
-            setSendBufferSize(Long.valueOf((String)settings.get("sendBufferSize")));
+            setSendBufferSize(Long.valueOf((String) settings.get("sendBufferSize")));
         }
         if (settings.containsKey("receiveBufferSize")) {
-            setReceiveBufferSize(Long.valueOf((String)settings.get("receiveBufferSize")));
-        }        
+            setReceiveBufferSize(Long.valueOf((String) settings.get("receiveBufferSize")));
+        }
         if (settings.containsKey("ssl")) {
             setTcpNoDelay(Boolean.valueOf((String) settings.get("ssl")));
         }
         if (settings.containsKey("corePoolSize")) {
-            setCorePoolSize(Integer.valueOf((String)settings.get("corePoolSize")));
+            setCorePoolSize(Integer.valueOf((String) settings.get("corePoolSize")));
         }
         if (settings.containsKey("maxPoolSize")) {
-            setMaxPoolSize(Integer.valueOf((String)settings.get("maxPoolSize")));
+            setMaxPoolSize(Integer.valueOf((String) settings.get("maxPoolSize")));
         }
     }
 
@@ -168,7 +180,7 @@ public class NettyConfiguration {
     public void setTcpNoDelay(boolean tcpNoDelay) {
         this.tcpNoDelay = tcpNoDelay;
     }
-   
+
     public boolean isBroadcast() {
         return broadcast;
     }
@@ -209,20 +221,41 @@ public class NettyConfiguration {
         this.sslHandler = sslHandler;
     }
 
+
+    public List<ChannelDownstreamHandler> getEncoders() {
+        return encoders;
+    }
+
+    public List<ChannelUpstreamHandler> getDecoders() {
+        return decoders;
+    }
+
     public ChannelDownstreamHandler getEncoder() {
-        return encoder;
+        return encoders.isEmpty() ? null : encoders.get(0);
     }
 
     public void setEncoder(ChannelDownstreamHandler encoder) {
-        this.encoder = encoder;
+        if (!encoders.contains(encoder)) {
+            encoders.add(encoder);
+        }
+    }
+
+    public void setEncoders(List<ChannelDownstreamHandler> encoders) {
+        this.encoders = encoders;
     }
 
     public ChannelUpstreamHandler getDecoder() {
-        return decoder;
+        return decoders.isEmpty() ? null : decoders.get(0);
     }
 
     public void setDecoder(ChannelUpstreamHandler decoder) {
-        this.decoder = decoder;
+        if (!decoders.contains(decoder)) {
+            decoders.add(decoder);
+        }
+    }
+
+    public void setDecoders(List<ChannelUpstreamHandler> decoders) {
+        this.decoders = decoders;
     }
 
     public ChannelHandler getHandler() {
@@ -248,7 +281,7 @@ public class NettyConfiguration {
     public void setSendBufferSize(long sendBufferSize) {
         this.sendBufferSize = sendBufferSize;
     }
-    
+
     public boolean isSsl() {
         return ssl;
     }
@@ -319,6 +352,17 @@ public class NettyConfiguration {
 
     public void setSecurityProvider(String securityProvider) {
         this.securityProvider = securityProvider;
-    }    
+    }
+
+    private <T> void addToHandlersList(List configured, List handlers, Class<? extends T> handlerType) {
+        if (handlers != null) {
+            for (int x = 0; x < handlers.size(); x++) {
+                Object handler = handlers.get(x);
+                if (handlerType.isInstance(handler)) {
+                    configured.add(handler);
+                }
+            }
+        }
+    }
 
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java?rev=935452&r1=935451&r2=935452&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java Mon Apr 19 04:52:23 2010
@@ -16,14 +16,18 @@
  */
 package org.apache.camel.component.netty;
 
+import java.util.List;
+
 import javax.net.ssl.SSLEngine;
 
 import org.apache.camel.component.netty.handlers.ServerChannelHandler;
 import org.apache.camel.component.netty.ssl.SSLEngineFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.handler.ssl.SslHandler;
 
@@ -45,8 +49,15 @@ public class ServerPipelineFactory imple
             }
             channelPipeline.addLast("ssl", sslHandler);            
         }
-        channelPipeline.addLast("decoder", consumer.getConfiguration().getDecoder());
-        channelPipeline.addLast("encoder", consumer.getConfiguration().getEncoder());
+        List<ChannelUpstreamHandler> decoders = consumer.getConfiguration().getDecoders();
+        for (int x = 0; x < decoders.size(); x++) {
+            channelPipeline.addLast("decoder-" + x, decoders.get(x));
+        }
+
+        List<ChannelDownstreamHandler> encoders = consumer.getConfiguration().getEncoders();
+        for (int x = 0; x < encoders.size(); x++) {
+            channelPipeline.addLast("encoder-" + x, encoders.get(x));
+        }
         if (consumer.getConfiguration().getHandler() != null) {
             channelPipeline.addLast("handler", consumer.getConfiguration().getHandler());
         } else {

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java?rev=935452&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java Mon Apr 19 04:52:23 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.string.StringEncoder;
+import org.junit.Test;
+
+public class MultipleCodecsTest extends CamelTestSupport {
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+
+        List<ChannelUpstreamHandler> decoders = new ArrayList<ChannelUpstreamHandler>();
+        decoders.add(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
+        decoders.add(new StringDecoder());
+
+        List<ChannelDownstreamHandler> encoders = new ArrayList<ChannelDownstreamHandler>();
+        encoders.add(new LengthFieldPrepender(4));
+        encoders.add(new StringEncoder());
+
+        registry.bind("encoders", encoders);
+        registry.bind("decoders", decoders);
+
+        return registry;
+    }
+
+    @Test
+    public void canSupplyMultipleCodecsToEndpointPipeline() throws Exception {
+        String poem = new Poetry().getPoem();
+        MockEndpoint mock = getMockEndpoint("mock:multiple-codec");
+        mock.expectedBodiesReceived(poem);
+        sendBody("direct:mutliple-codec", poem);
+        mock.await(1, TimeUnit.SECONDS);
+        mock.assertIsSatisfied();
+
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:mutliple-codec").to("netty:tcp://localhost:5150?encoders=#encoders");
+
+                from("netty:tcp://localhost:5150?decoders=#decoders").to("mock:multiple-codec");
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date