You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/03/10 12:18:29 UTC

svn commit: r752068 - in /camel/trunk/components/camel-mina/src: main/java/org/apache/camel/component/mina/ test/java/org/apache/camel/component/mina/

Author: davsclaus
Date: Tue Mar 10 11:18:28 2009
New Revision: 752068

URL: http://svn.apache.org/viewvc?rev=752068&view=rev
Log:
CAMEL-1444: Potential problem with mina udp using the same camel context as both client and server.

Added:
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java   (with props)
Modified:
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
    camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpLineDelimiterUsingPlainSocketTest.java
    camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java
    camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Tue Mar 10 11:18:28 2009
@@ -19,34 +19,25 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.URI;
-import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
-import java.nio.charset.CharsetEncoder;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.NoTypeConversionAvailableException;
-import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoFilter;
 import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSession;
 import org.apache.mina.filter.LoggingFilter;
 import org.apache.mina.filter.codec.ProtocolCodecFactory;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.filter.codec.ProtocolEncoder;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
 import org.apache.mina.filter.codec.textline.LineDelimiter;
 import org.apache.mina.transport.socket.nio.DatagramAcceptor;
@@ -294,48 +285,8 @@
         ProtocolCodecFactory codecFactory = configuration.getCodec();
         if (codecFactory == null) {
             final Charset charset = getEncodingParameter(type, configuration);
-
-            // set the encoder used for this datagram codec factory
-            codecFactory = new ProtocolCodecFactory() {
-                public ProtocolEncoder getEncoder() throws Exception {
-                    return new ProtocolEncoder() {
-                        private CharsetEncoder encoder;
-
-                        public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
-                            if (encoder == null) {
-                                encoder = charset.newEncoder();
-                            }
-                            ByteBuffer buf = toByteBuffer(message, encoder);
-                            buf.flip();
-                            out.write(buf);
-                        }
-
-                        public void dispose(IoSession session) throws Exception {
-                            // do nothing
-                        }
-                    };
-                }
-
-                public ProtocolDecoder getDecoder() throws Exception {
-                    return new ProtocolDecoder() {
-                        public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
-                            // must acquire the bytebuffer since we just pass it below instead of creating a new one (CAMEL-257)
-                            in.acquire();
-
-                            // lets just pass the ByteBuffer in
-                            out.write(in);
-                        }
-
-                        public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
-                            // do nothing
-                        }
-
-                        public void dispose(IoSession session) throws Exception {
-                            // do nothing
-                        }
-                    };
-                }
-            };
+            
+            codecFactory = new MinaUdpProtocolCodecFactory(getCamelContext(), charset);
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug(type + ": Using CodecFactory: " + codecFactory + " using encoding: " + charset);
@@ -345,18 +296,6 @@
         addCodecFactory(config, codecFactory);
     }
 
-    private ByteBuffer toByteBuffer(Object message, CharsetEncoder encoder) throws CharacterCodingException {
-        ByteBuffer answer;
-        try {
-            answer = convertTo(ByteBuffer.class, message);
-        } catch (NoTypeConversionAvailableException e) {
-            String value = convertTo(String.class, message);
-            answer = ByteBuffer.allocate(value.length()).setAutoExpand(true);
-            answer.putString(value, encoder);
-        }
-        return answer;
-    }
-
     private void addCodecFactory(IoServiceConfig config, ProtocolCodecFactory codecFactory) {
         config.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));
     }

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Tue Mar 10 11:18:28 2009
@@ -92,8 +92,14 @@
 
         @Override
         public void messageReceived(IoSession session, Object object) throws Exception {
+            // log what we received
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Received body: " + object);
+                Object in = object;
+                if (in instanceof byte[]) {
+                    // byte arrays is not readable so convert to string
+                    in = endpoint.getCamelContext().getTypeConverter().convertTo(String.class, in);
+                }
+                LOG.debug("Received body: " + in);
             }
 
             Exchange exchange = endpoint.createExchange(session, object);

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java Tue Mar 10 11:18:28 2009
@@ -32,18 +32,14 @@
  */
 @Converter
 public final class MinaConverter {
+
     private MinaConverter() {
         //Utility Class
     }
+
     @Converter
     public static byte[] toByteArray(ByteBuffer buffer) {
         byte[] answer = new byte[buffer.remaining()];
-        try {
-            // must acquire the Byte buffer to avoid release if more than twice
-            buffer.acquire();
-        } catch (IllegalStateException ex) {
-            // catch the exception if we acquire the buffer which is already released.
-        }
         buffer.get(answer);
         return answer;
     }

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Tue Mar 10 11:18:28 2009
@@ -86,10 +86,16 @@
             handler.reset();
         }
 
-        // write the body
+        // log what we are writing
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Writing body: " + body);
+            Object out = body;
+            if (body instanceof byte[]) {
+                // byte arrays is not readable so convert to string
+                out = exchange.getContext().getTypeConverter().convertTo(String.class, body);
+            }
+            LOG.debug("Writing body : " + out);
         }
+        // write the body
         MinaHelper.writeBody(session, body, exchange);
 
         if (sync) {

Added: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java?rev=752068&view=auto
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java (added)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java Tue Mar 10 11:18:28 2009
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+
+/**
+ * @version $Revision$
+ */
+public class MinaUdpProtocolCodecFactory implements ProtocolCodecFactory {
+
+    private final Charset charset;
+    private final CamelContext context;
+
+    public MinaUdpProtocolCodecFactory(CamelContext context, Charset charset) {
+        this.context = context;
+        this.charset = charset;
+    }
+
+    public ProtocolEncoder getEncoder() throws Exception {
+        return new ProtocolEncoder() {
+            private CharsetEncoder encoder;
+
+            public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
+                if (encoder == null) {
+                    encoder = charset.newEncoder();
+                }
+                ByteBuffer buf = toByteBuffer(message, encoder);
+                buf.flip();
+                out.write(buf);
+            }
+
+            public void dispose(IoSession session) throws Exception {
+                // do nothing
+            }
+        };
+    }
+
+    public ProtocolDecoder getDecoder() throws Exception {
+        return new ProtocolDecoder() {
+            public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
+                // convert to bytes to write, we can not pass in the byte buffer as it could be sent to
+                // multiple mina sessions so we must convert it to bytes
+                byte[] bytes = context.getTypeConverter().convertTo(byte[].class, in);
+                out.write(bytes);
+            }
+
+            public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
+                // do nothing
+            }
+
+            public void dispose(IoSession session) throws Exception {
+                // do nothing
+            }
+        };
+    }
+
+    private ByteBuffer toByteBuffer(Object message, CharsetEncoder encoder) throws CharacterCodingException {
+        ByteBuffer answer;
+        try {
+            answer = context.getTypeConverter().convertTo(ByteBuffer.class, message);
+        } catch (NoTypeConversionAvailableException e) {
+            String value = context.getTypeConverter().convertTo(String.class, message);
+            answer = ByteBuffer.allocate(value.length()).setAutoExpand(true);
+            answer.putString(value, encoder);
+        }
+        return answer;
+    }
+
+
+}

Propchange: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpLineDelimiterUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpLineDelimiterUsingPlainSocketTest.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpLineDelimiterUsingPlainSocketTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpLineDelimiterUsingPlainSocketTest.java Tue Mar 10 11:18:28 2009
@@ -123,6 +123,9 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
+                // use no delay for fast unit testing
+                errorHandler(deadLetterChannel().maximumRedeliveries(2).delay(0));
+
                 from(uri).process(new Processor() {
                     public void process(Exchange e) {
                         String in = e.getIn().getBody(String.class);

Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java Tue Mar 10 11:18:28 2009
@@ -22,7 +22,6 @@
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.DefaultExchange;
 
 /**
  * To unit test CAMEL-364.
@@ -48,6 +47,9 @@
             public void configure() {
                 from(uri).process(new Processor() {
                     public void process(Exchange e) {
+                        // use no delay for fast unit testing
+                        errorHandler(deadLetterChannel().maximumRedeliveries(2).delay(0));
+
                         assertEquals("Hello World", e.getIn().getBody(String.class));
                         // simulate a problem processing the input to see if we can handle it properly
                         throw new IllegalArgumentException("Forced exception");

Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java Tue Mar 10 11:18:28 2009
@@ -26,7 +26,6 @@
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.mina.common.ByteBuffer;
 
 /**
  * To test InOut exchange for the UDP protocol.
@@ -49,14 +48,14 @@
         byte[] data = input.getBytes();
 
         DatagramPacket packet = new DatagramPacket(data, data.length, address, PORT);
-        LOG.debug("Sending data");
+        LOG.debug("+++ Sending data +++");
         socket.send(packet);
 
         Thread.sleep(1000);
 
         byte[] buf = new byte[128];
         DatagramPacket receive = new DatagramPacket(buf, buf.length, address, PORT);
-        LOG.debug("Receving data");
+        LOG.debug("+++ Receving data +++");
         socket.receive(receive);
 
         socket.close();
@@ -69,8 +68,7 @@
             public void configure() {
                 from("mina:udp://127.0.0.1:" + PORT + "?sync=true").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
-                        ByteBuffer in = exchange.getIn().getBody(ByteBuffer.class);
-                        String s = MinaConverter.toString(in, exchange);
+                        String s = exchange.getIn().getBody(String.class);
                         exchange.getOut().setBody("Hello " + s);
                     }
                 });