You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/04/19 06:52:23 UTC
svn commit: r935452 - in /camel/trunk/components/camel-netty/src:
main/java/org/apache/camel/component/netty/
test/java/org/apache/camel/component/netty/
Author: davsclaus
Date: Mon Apr 19 04:52:23 2010
New Revision: 935452
URL: http://svn.apache.org/viewvc?rev=935452&view=rev
Log:
CAMEL-2657: Multiple encoder and decoders now possible. Thanks to Stephen Gargan for the patch.
Added:
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java (with props)
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=935452&r1=935451&r2=935452&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java Mon Apr 19 04:52:23 2010
@@ -16,14 +16,18 @@
*/
package org.apache.camel.component.netty;
+import java.util.List;
+
import javax.net.ssl.SSLEngine;
import org.apache.camel.component.netty.handlers.ClientChannelHandler;
import org.apache.camel.component.netty.ssl.SSLEngineFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -50,9 +54,17 @@ public class ClientPipelineFactory imple
}
channelPipeline.addLast("ssl", sslHandler);
}
-
- channelPipeline.addLast("decoder", producer.getConfiguration().getDecoder());
- channelPipeline.addLast("encoder", producer.getConfiguration().getEncoder());
+
+ List<ChannelUpstreamHandler> decoders = producer.getConfiguration().getDecoders();
+ for (int x = 0; x < decoders.size(); x++) {
+ channelPipeline.addLast("decoder-" + x, decoders.get(x));
+ }
+
+ List<ChannelDownstreamHandler> encoders = producer.getConfiguration().getEncoders();
+ for (int x = 0; x < encoders.size(); x++) {
+ channelPipeline.addLast("encoder-" + x, encoders.get(x));
+ }
+
if (producer.getConfiguration().getHandler() != null) {
channelPipeline.addLast("handler", producer.getConfiguration().getHandler());
} else {
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=935452&r1=935451&r2=935452&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Mon Apr 19 04:52:23 2010
@@ -18,6 +18,8 @@ package org.apache.camel.component.netty
import java.io.File;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.camel.util.URISupport;
@@ -29,6 +31,7 @@ import org.jboss.netty.handler.codec.ser
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
import org.jboss.netty.handler.ssl.SslHandler;
+@SuppressWarnings("unchecked")
public class NettyConfiguration {
private String protocol;
private String host;
@@ -44,8 +47,8 @@ public class NettyConfiguration {
private File keyStoreFile;
private File trustStoreFile;
private SslHandler sslHandler;
- private ChannelDownstreamHandler encoder;
- private ChannelUpstreamHandler decoder;
+ private List<ChannelDownstreamHandler> encoders = new ArrayList<ChannelDownstreamHandler>();
+ private List<ChannelUpstreamHandler> decoders = new ArrayList<ChannelUpstreamHandler>();
private ChannelHandler handler;
private boolean ssl;
private long sendBufferSize;
@@ -69,63 +72,72 @@ public class NettyConfiguration {
setCorePoolSize(10);
setMaxPoolSize(100);
}
-
+
public void parseURI(URI uri, Map<String, Object> parameters, NettyComponent component) throws Exception {
protocol = uri.getScheme();
-
+
if ((!protocol.equalsIgnoreCase("tcp")) && (!protocol.equalsIgnoreCase("udp"))) {
throw new IllegalArgumentException("Unrecognized Netty protocol: " + protocol + " for uri: " + uri);
}
-
+
setHost(uri.getHost());
setPort(uri.getPort());
-
+
sslHandler = component.resolveAndRemoveReferenceParameter(parameters, "sslHandler", SslHandler.class, null);
passphrase = component.resolveAndRemoveReferenceParameter(parameters, "passphrase", String.class, null);
- keyStoreFormat = component.getAndRemoveParameter(parameters, "keyStoreFormat", String.class, "JKS");
+ keyStoreFormat = component.getAndRemoveParameter(parameters, "keyStoreFormat", String.class, "JKS");
securityProvider = component.getAndRemoveParameter(parameters, "securityProvider", String.class, "SunX509");
keyStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "keyStoreFile", File.class, null);
trustStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "trustStoreFile", File.class, null);
- encoder = component.resolveAndRemoveReferenceParameter(parameters, "encoder", ChannelDownstreamHandler.class, new ObjectEncoder());
- decoder = component.resolveAndRemoveReferenceParameter(parameters, "decoder", ChannelUpstreamHandler.class, new ObjectDecoder());
- handler = component.resolveAndRemoveReferenceParameter(parameters, "handler", SimpleChannelHandler.class, null);
-
+
+ List<ChannelDownstreamHandler> referencedEncoders = component.resolveAndRemoveReferenceParameter(parameters, "encoders", List.class, null);
+ addToHandlersList(encoders, referencedEncoders, ChannelDownstreamHandler.class);
+ List<ChannelUpstreamHandler> referencedDecoders = component.resolveAndRemoveReferenceParameter(parameters, "decoders", List.class, null);
+ addToHandlersList(decoders, referencedDecoders, ChannelUpstreamHandler.class);
+
+ if (encoders.isEmpty() && decoders.isEmpty()) {
+ encoders.add(component.resolveAndRemoveReferenceParameter(parameters, "encoder", ChannelDownstreamHandler.class, new ObjectEncoder()));
+ decoders.add(component.resolveAndRemoveReferenceParameter(parameters, "decoder", ChannelUpstreamHandler.class, new ObjectDecoder()));
+ }
+
+ handler = component.resolveAndRemoveReferenceParameter(parameters, "handler", SimpleChannelHandler.class, null);
+
Map<String, Object> settings = URISupport.parseParameters(uri);
if (settings.containsKey("keepAlive")) {
setKeepAlive(Boolean.valueOf((String) settings.get("keepAlive")));
- }
+ }
if (settings.containsKey("tcpNoDelay")) {
setTcpNoDelay(Boolean.valueOf((String) settings.get("tcpNoDelay")));
- }
+ }
if (settings.containsKey("broadcast")) {
setBroadcast(Boolean.valueOf((String) settings.get("broadcast")));
- }
+ }
if (settings.containsKey("reuseAddress")) {
setReuseAddress(Boolean.valueOf((String) settings.get("reuseAddress")));
}
if (settings.containsKey("connectTimeoutMillis")) {
- setConnectTimeoutMillis(Long.valueOf((String)settings.get("connectTimeoutMillis")));
+ setConnectTimeoutMillis(Long.valueOf((String) settings.get("connectTimeoutMillis")));
}
if (settings.containsKey("sync")) {
setTcpNoDelay(Boolean.valueOf((String) settings.get("sync")));
}
if (settings.containsKey("receiveTimeoutMillis")) {
- setReceiveTimeoutMillis(Long.valueOf((String)settings.get("receiveTimeoutMillis")));
+ setReceiveTimeoutMillis(Long.valueOf((String) settings.get("receiveTimeoutMillis")));
}
if (settings.containsKey("sendBufferSize")) {
- setSendBufferSize(Long.valueOf((String)settings.get("sendBufferSize")));
+ setSendBufferSize(Long.valueOf((String) settings.get("sendBufferSize")));
}
if (settings.containsKey("receiveBufferSize")) {
- setReceiveBufferSize(Long.valueOf((String)settings.get("receiveBufferSize")));
- }
+ setReceiveBufferSize(Long.valueOf((String) settings.get("receiveBufferSize")));
+ }
if (settings.containsKey("ssl")) {
setTcpNoDelay(Boolean.valueOf((String) settings.get("ssl")));
}
if (settings.containsKey("corePoolSize")) {
- setCorePoolSize(Integer.valueOf((String)settings.get("corePoolSize")));
+ setCorePoolSize(Integer.valueOf((String) settings.get("corePoolSize")));
}
if (settings.containsKey("maxPoolSize")) {
- setMaxPoolSize(Integer.valueOf((String)settings.get("maxPoolSize")));
+ setMaxPoolSize(Integer.valueOf((String) settings.get("maxPoolSize")));
}
}
@@ -168,7 +180,7 @@ public class NettyConfiguration {
public void setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}
-
+
public boolean isBroadcast() {
return broadcast;
}
@@ -209,20 +221,41 @@ public class NettyConfiguration {
this.sslHandler = sslHandler;
}
+
+ public List<ChannelDownstreamHandler> getEncoders() {
+ return encoders;
+ }
+
+ public List<ChannelUpstreamHandler> getDecoders() {
+ return decoders;
+ }
+
public ChannelDownstreamHandler getEncoder() {
- return encoder;
+ return encoders.isEmpty() ? null : encoders.get(0);
}
public void setEncoder(ChannelDownstreamHandler encoder) {
- this.encoder = encoder;
+ if (!encoders.contains(encoder)) {
+ encoders.add(encoder);
+ }
+ }
+
+ public void setEncoders(List<ChannelDownstreamHandler> encoders) {
+ this.encoders = encoders;
}
public ChannelUpstreamHandler getDecoder() {
- return decoder;
+ return decoders.isEmpty() ? null : decoders.get(0);
}
public void setDecoder(ChannelUpstreamHandler decoder) {
- this.decoder = decoder;
+ if (!decoders.contains(decoder)) {
+ decoders.add(decoder);
+ }
+ }
+
+ public void setDecoders(List<ChannelUpstreamHandler> decoders) {
+ this.decoders = decoders;
}
public ChannelHandler getHandler() {
@@ -248,7 +281,7 @@ public class NettyConfiguration {
public void setSendBufferSize(long sendBufferSize) {
this.sendBufferSize = sendBufferSize;
}
-
+
public boolean isSsl() {
return ssl;
}
@@ -319,6 +352,17 @@ public class NettyConfiguration {
public void setSecurityProvider(String securityProvider) {
this.securityProvider = securityProvider;
- }
+ }
+
+ private <T> void addToHandlersList(List configured, List handlers, Class<? extends T> handlerType) {
+ if (handlers != null) {
+ for (int x = 0; x < handlers.size(); x++) {
+ Object handler = handlers.get(x);
+ if (handlerType.isInstance(handler)) {
+ configured.add(handler);
+ }
+ }
+ }
+ }
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java?rev=935452&r1=935451&r2=935452&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java Mon Apr 19 04:52:23 2010
@@ -16,14 +16,18 @@
*/
package org.apache.camel.component.netty;
+import java.util.List;
+
import javax.net.ssl.SSLEngine;
import org.apache.camel.component.netty.handlers.ServerChannelHandler;
import org.apache.camel.component.netty.ssl.SSLEngineFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -45,8 +49,15 @@ public class ServerPipelineFactory imple
}
channelPipeline.addLast("ssl", sslHandler);
}
- channelPipeline.addLast("decoder", consumer.getConfiguration().getDecoder());
- channelPipeline.addLast("encoder", consumer.getConfiguration().getEncoder());
+ List<ChannelUpstreamHandler> decoders = consumer.getConfiguration().getDecoders();
+ for (int x = 0; x < decoders.size(); x++) {
+ channelPipeline.addLast("decoder-" + x, decoders.get(x));
+ }
+
+ List<ChannelDownstreamHandler> encoders = consumer.getConfiguration().getEncoders();
+ for (int x = 0; x < encoders.size(); x++) {
+ channelPipeline.addLast("encoder-" + x, encoders.get(x));
+ }
if (consumer.getConfiguration().getHandler() != null) {
channelPipeline.addLast("handler", consumer.getConfiguration().getHandler());
} else {
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java?rev=935452&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java Mon Apr 19 04:52:23 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.string.StringEncoder;
+import org.junit.Test;
+
+public class MultipleCodecsTest extends CamelTestSupport {
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+
+ List<ChannelUpstreamHandler> decoders = new ArrayList<ChannelUpstreamHandler>();
+ decoders.add(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
+ decoders.add(new StringDecoder());
+
+ List<ChannelDownstreamHandler> encoders = new ArrayList<ChannelDownstreamHandler>();
+ encoders.add(new LengthFieldPrepender(4));
+ encoders.add(new StringEncoder());
+
+ registry.bind("encoders", encoders);
+ registry.bind("decoders", decoders);
+
+ return registry;
+ }
+
+ @Test
+ public void canSupplyMultipleCodecsToEndpointPipeline() throws Exception {
+ String poem = new Poetry().getPoem();
+ MockEndpoint mock = getMockEndpoint("mock:multiple-codec");
+ mock.expectedBodiesReceived(poem);
+ sendBody("direct:mutliple-codec", poem);
+ mock.await(1, TimeUnit.SECONDS);
+ mock.assertIsSatisfied();
+
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:mutliple-codec").to("netty:tcp://localhost:5150?encoders=#encoders");
+
+ from("netty:tcp://localhost:5150?decoders=#decoders").to("mock:multiple-codec");
+ }
+ };
+ }
+
+}
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date