You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/05/06 16:38:11 UTC

svn commit: r941756 - in /camel/trunk/components: camel-mina/src/main/java/org/apache/camel/component/mina/ camel-mina/src/test/java/org/apache/camel/component/mina/ camel-netty/src/main/java/org/apache/camel/component/netty/ camel-netty/src/main/java/...

Author: davsclaus
Date: Thu May  6 14:38:10 2010
New Revision: 941756

URL: http://svn.apache.org/viewvc?rev=941756&view=rev
Log:
Added transferExchange option to camel-netty.

Added:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java   (with props)
Modified:
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
    camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java Thu May  6 14:38:10 2010
@@ -20,7 +20,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultExchangeHolder;
 
 /**
- * Helper to get and set the correct payload when transfering data using camel-mina.
+ * Helper to get and set the correct payload when transferring data using camel-mina.
  * Always use this helper instead of direct access on the exchange object.
  * <p/>
  * This helper ensures that we can also transfer exchange objects over the wire using the

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Thu May  6 14:38:10 2010
@@ -243,7 +243,7 @@ public class MinaProducer extends Defaul
 
         @Override
         public void sessionClosed(IoSession session) throws Exception {
-            if (sync && message == null) {
+            if (sync && !messageReceived) {
                 // sync=true (InOut mode) so we expected a message as reply but did not get one before the session is closed
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Session closed but no message received from address: " + this.endpoint.getAddress());

Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java Thu May  6 14:38:10 2010
@@ -38,7 +38,7 @@ public class MinaTransferExchangeOptionT
 
     protected String uri = "mina:tcp://localhost:6321?sync=true&encoding=UTF-8&transferExchange=true";
 
-    public void testMianTransferExchangeOptionWithoutException() throws Exception {
+    public void testMinaTransferExchangeOptionWithoutException() throws Exception {
         Exchange exchange = sendExchange(false);
         assertExchange(exchange, false);
     }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu May  6 14:38:10 2010
@@ -59,6 +59,7 @@ public class NettyConfiguration {
     private String securityProvider;
     private boolean disconnect;
     private boolean lazyChannelCreation = true;
+    private boolean transferExchange;
 
     public NettyConfiguration() {
         setKeepAlive(true);
@@ -148,6 +149,9 @@ public class NettyConfiguration {
         if (settings.containsKey("lazyChannelCreation")) {
             setLazyChannelCreation(Boolean.valueOf((String) settings.get("lazyChannelCreation")));
         }
+        if (settings.containsKey("transferExchange")) {
+            setTransferExchange(Boolean.valueOf((String) settings.get("transferExchange")));
+        }
     }
 
     public String getProtocol() {
@@ -378,6 +382,14 @@ public class NettyConfiguration {
         this.lazyChannelCreation = lazyChannelCreation;
     }
 
+    public boolean isTransferExchange() {
+        return transferExchange;
+    }
+
+    public void setTransferExchange(boolean transferExchange) {
+        this.transferExchange = transferExchange;
+    }
+
     public String getAddress() {
         return host + ":" + port;
     }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java Thu May  6 14:38:10 2010
@@ -26,6 +26,7 @@ public final class NettyConstants {
     public static final String NETTY_CLOSE_CHANNEL_WHEN_COMPLETE = "CamelNettyCloseChannelWhenComplete";
     public static final String NETTY_CHANNEL_HANDLER_CONTEXT = "CamelNettyChannelHandlerContext";
     public static final String NETTY_MESSAGE_EVENT = "CamelNettyMessageEvent";
+    public static final String NETTY_REMOTE_ADDRESS = "CamelNettyRemoteAddress";
 
     private NettyConstants() {
         // Utility class

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java Thu May  6 14:38:10 2010
@@ -45,7 +45,9 @@ public class NettyEndpoint extends Defau
         Exchange exchange = createExchange();
         exchange.getIn().setHeader(NettyConstants.NETTY_CHANNEL_HANDLER_CONTEXT, ctx);
         exchange.getIn().setHeader(NettyConstants.NETTY_MESSAGE_EVENT, messageEvent);
-        return exchange;        
+        exchange.getIn().setHeader(NettyConstants.NETTY_REMOTE_ADDRESS, messageEvent.getRemoteAddress());
+        NettyPayloadHelper.setIn(exchange, messageEvent.getMessage());
+        return exchange;
     }
     
     public boolean isSingleton() {

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java?rev=941756&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java Thu May  6 14:38:10 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+
+/**
+ * Helper to get and set the correct payload when transferring data using camel-netty.
+ * Always use this helper instead of direct access on the exchange object.
+ * <p/>
+ * This helper ensures that we can also transfer exchange objects over the wire using the
+ * <tt>transferExchange=true</tt> option.
+ *
+ * @version $Revision$
+ */
+public final class NettyPayloadHelper {
+
+    public static Object getIn(NettyEndpoint endpoint, Exchange exchange) {
+        if (endpoint.getConfiguration().isTransferExchange()) {
+            // we should transfer the entire exchange over the wire (includes in/out)
+            return DefaultExchangeHolder.marshal(exchange);
+        } else {
+            // normal transfer using the body only
+            return exchange.getIn().getBody();
+        }
+    }
+
+    public static Object getOut(NettyEndpoint endpoint, Exchange exchange) {
+        if (endpoint.getConfiguration().isTransferExchange()) {
+            // we should transfer the entire exchange over the wire (includes in/out)
+            return DefaultExchangeHolder.marshal(exchange);
+        } else {
+            // normal transfer using the body only
+            return exchange.getOut().getBody();
+        }
+    }
+
+    public static void setIn(Exchange exchange, Object payload) {
+        if (payload instanceof DefaultExchangeHolder) {
+            DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
+        } else {
+            // normal transfer using the body only
+            exchange.getIn().setBody(payload);
+        }
+    }
+
+    public static void setOut(Exchange exchange, Object payload) {
+        if (payload instanceof DefaultExchangeHolder) {
+            DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
+        } else {
+            // normal transfer using the body only and preserve the headers
+            exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+            exchange.getOut().setBody(payload);
+        }
+    }
+
+}

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May  6 14:38:10 2010
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelException;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.ServicePoolAware;
@@ -108,20 +109,50 @@ public class NettyProducer extends Defau
             openConnection();
         }
 
+        Object body = NettyPayloadHelper.getIn(getEndpoint(), exchange);
+        if (body == null) {
+            LOG.warn("No payload to send for exchange: " + exchange);
+            return; // exit early since nothing to write
+        }
+
         if (configuration.isSync()) {
+            // only initialize latch if we should get a response
             countdownLatch = new CountDownLatch(1);
         }
 
+        // log what we are writing
+        if (LOG.isDebugEnabled()) {
+            Object out = body;
+            if (body instanceof byte[]) {
+                // byte arrays is not readable so convert to string
+                out = exchange.getContext().getTypeConverter().convertTo(String.class, body);
+            }
+            LOG.debug("Writing body : " + out);
+        }
+
         // write the body
-        NettyHelper.writeBody(channel, null, exchange.getIn().getBody(), exchange);
+        NettyHelper.writeBody(channel, null, body, exchange);
 
         if (configuration.isSync()) {
             boolean success = countdownLatch.await(configuration.getReceiveTimeoutMillis(), TimeUnit.MILLISECONDS);
             if (!success) {
                 throw new ExchangeTimedOutException(exchange, configuration.getReceiveTimeoutMillis());
             }
-            Object response = ((ClientChannelHandler) clientPipeline.get("handler")).getResponse();
-            exchange.getOut().setBody(response);
+
+            ClientChannelHandler handler = (ClientChannelHandler) clientPipeline.get("handler");
+            if (handler.getCause() != null) {
+                throw new CamelExchangeException("Error occurred in ClientChannelHandler", exchange, handler.getCause());
+            } else if (!handler.isMessageReceived()) {
+                // no message received
+                throw new CamelExchangeException("No response received from remote server: " + configuration.getAddress(), exchange);
+            } else {
+                // set the result on either IN or OUT on the original exchange depending on its pattern
+                if (ExchangeHelper.isOutCapable(exchange)) {
+                    NettyPayloadHelper.setOut(exchange, handler.getMessage());
+                } else {
+                    NettyPayloadHelper.setIn(exchange, handler.getMessage());
+                }
+            }
         }
 
         // should channel be closed after complete?

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Thu May  6 14:38:10 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.netty.handlers;
 
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.camel.CamelException;
 import org.apache.camel.component.netty.NettyHelper;
 import org.apache.camel.component.netty.NettyProducer;
@@ -32,13 +34,21 @@ import org.jboss.netty.channel.SimpleCha
 public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
     private static final transient Log LOG = LogFactory.getLog(ClientChannelHandler.class);
     private NettyProducer producer;
-    private Object response;
-    
+    private Object message;
+    private Throwable cause;
+    private boolean messageReceived;
+
     public ClientChannelHandler(NettyProducer producer) {
         super();
         this.producer = producer;
     }
 
+    public void reset() {
+        this.message = null;
+        this.cause = null;
+        this.messageReceived = false;
+    }
+
     @Override
     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception {
         // to keep track of open sockets
@@ -47,35 +57,59 @@ public class ClientChannelHandler extend
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
+        this.message = null;
+        this.messageReceived = false;
+        this.cause = exceptionEvent.getCause();
+
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
+            LOG.debug("Closing channel as an exception was thrown from Netty", cause);
         }
         // close channel in case an exception was thrown
         NettyHelper.close(exceptionEvent.getChannel());
+    }
 
-        // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
-        throw new CamelException(exceptionEvent.getCause());
+    @Override
+    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        if (producer.getConfiguration().isSync() && !messageReceived) {
+            // sync=true (InOut mode) so we expected a message as reply but did not get one before the session is closed
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Channel closed but no message received from address: " + producer.getConfiguration().getAddress());
+            }
+            // session was closed but no message received. This could be because the remote server had an internal error
+            // and could not return a response. We should count down to stop waiting for a response
+            countDown();
+        }
     }
 
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
-        setResponse(messageEvent.getMessage());
+        message = messageEvent.getMessage();
+        messageReceived = true;
+        cause = null;
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Incoming message:" + response);
+            LOG.debug("Message received: " + message);
         }
 
+        // signal we have received message
+        countDown();
+    }
+
+    protected void countDown() {
         if (producer.getConfiguration().isSync()) {
             producer.getCountdownLatch().countDown();
-        }        
+        }
+    }
+
+    public Object getMessage() {
+        return message;
     }
 
-    public Object getResponse() {
-        return response;
+    public boolean isMessageReceived() {
+        return messageReceived;
     }
 
-    public void setResponse(Object response) {
-        this.response = response;
+    public Throwable getCause() {
+        return cause;
     }
-    
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Thu May  6 14:38:10 2010
@@ -22,6 +22,7 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.netty.NettyConstants;
 import org.apache.camel.component.netty.NettyConsumer;
 import org.apache.camel.component.netty.NettyHelper;
+import org.apache.camel.component.netty.NettyPayloadHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -76,7 +77,6 @@ public class ServerChannelHandler extend
         if (consumer.getConfiguration().isSync()) {
             exchange.setPattern(ExchangePattern.InOut);
         }
-        exchange.getIn().setBody(in);
 
         try {
             consumer.getProcessor().process(exchange);
@@ -93,17 +93,18 @@ public class ServerChannelHandler extend
     private void sendResponse(MessageEvent messageEvent, Exchange exchange) throws Exception {
         Object body;
         if (ExchangeHelper.isOutCapable(exchange)) {
-            body = exchange.getOut().getBody();
+            body = NettyPayloadHelper.getOut(consumer.getEndpoint(), exchange);
         } else {
-            body = exchange.getIn().getBody();
+            body = NettyPayloadHelper.getIn(consumer.getEndpoint(), exchange);
         }
 
-        if (exchange.isFailed()) {
-            if (exchange.getException() == null) {
-                // fault detected
-                body = exchange.getOut().getBody();
-            } else {
+        boolean failed = exchange.isFailed();
+        if (failed && !consumer.getEndpoint().getConfiguration().isTransferExchange()) {
+            if (exchange.getException() != null) {
                 body = exchange.getException();
+            } else {
+                // failed and no exception, must be a fault
+                body = exchange.getOut().getBody();
             }
         }
 

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java?rev=941756&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java Thu May  6 14:38:10 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyInOutWithForcedNoResponseTest extends CamelTestSupport {
+
+    @Test
+    public void testResponse() throws Exception {
+        Object out = template.requestBody("netty:tcp://localhost:4444", "Copenhagen");
+        assertEquals("Hello Claus", out);
+    }
+
+    @Test
+    public void testNoResponse() throws Exception {
+        try {
+            template.requestBody("netty:tcp://localhost:4444", "London");
+            fail("Should throw an exception");
+        } catch (RuntimeCamelException e) {
+            assertTrue(e.getCause().getMessage().startsWith("No response"));
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("netty:tcp://localhost:4444")
+                    .choice()
+                        .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
+                        .otherwise().transform(constant(null));
+            }
+        };
+    }
+}

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java?rev=941756&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java Thu May  6 14:38:10 2010
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.nio.charset.Charset;
+
+import junit.framework.Assert;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyTransferExchangeOptionTest extends CamelTestSupport {
+
+    protected String uri = "netty:tcp://localhost:6321?transferExchange=true";
+
+    @Test
+    public void testNettyTransferExchangeOptionWithoutException() throws Exception {
+        Exchange exchange = sendExchange(false);
+        assertExchange(exchange, false);
+    }
+
+    @Test
+    public void testNettyTransferExchangeOptionWithException() throws Exception {
+        Exchange exchange = sendExchange(true);
+        assertExchange(exchange, true);
+    }
+
+    private Exchange sendExchange(boolean setException) throws Exception {
+        Endpoint endpoint = context.getEndpoint(uri);
+        Exchange exchange = endpoint.createExchange();
+
+        Message message = exchange.getIn();
+        message.setBody("Hello!");
+        message.setHeader("cheese", "feta");
+        exchange.setProperty("ham", "old");
+        exchange.setProperty("setException", setException);
+
+        Producer producer = endpoint.createProducer();
+        producer.start();
+        producer.process(exchange);
+
+        return exchange;
+    }
+
+    private void assertExchange(Exchange exchange, boolean hasFault) {
+        if (!hasFault) {
+            Message out = exchange.getOut();
+            assertNotNull(out);
+            assertFalse(out.isFault());
+            assertEquals("Goodbye!", out.getBody());
+            assertEquals("cheddar", out.getHeader("cheese"));
+        } else {
+            Message fault = exchange.getOut();
+            assertNotNull(fault);
+            assertTrue(fault.isFault());
+            assertNotNull(fault.getBody());
+            assertTrue("Should get the InterrupteException exception", fault.getBody() instanceof InterruptedException);
+            assertEquals("nihao", fault.getHeader("hello"));
+        }
+
+
+        // in should stay the same
+        Message in = exchange.getIn();
+        assertNotNull(in);
+        assertEquals("Hello!", in.getBody());
+        assertEquals("feta", in.getHeader("cheese"));
+        // however the shared properties have changed
+        assertEquals("fresh", exchange.getProperty("salami"));
+        assertNull(exchange.getProperty("Charset"));
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from(uri).process(new Processor() {
+                    public void process(Exchange e) throws InterruptedException {
+                        Assert.assertNotNull(e.getIn().getBody());
+                        Assert.assertNotNull(e.getIn().getHeaders());
+                        Assert.assertNotNull(e.getProperties());
+                        Assert.assertEquals("Hello!", e.getIn().getBody());
+                        Assert.assertEquals("feta", e.getIn().getHeader("cheese"));
+                        Assert.assertEquals("old", e.getProperty("ham"));
+                        Assert.assertEquals(ExchangePattern.InOut, e.getPattern());
+                        Boolean setException = (Boolean) e.getProperty("setException");
+
+                        if (setException) {
+                            e.getOut().setFault(true);
+                            e.getOut().setBody(new InterruptedException());
+                            e.getOut().setHeader("hello", "nihao");
+                        } else {
+                            e.getOut().setBody("Goodbye!");
+                            e.getOut().setHeader("cheese", "cheddar");
+                        }
+                        e.setProperty("salami", "fresh");
+                        e.setProperty("Charset", Charset.defaultCharset());
+                    }
+                });
+            }
+        };
+    }
+}
+
+

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date