You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/03/10 12:18:29 UTC
svn commit: r752068 - in /camel/trunk/components/camel-mina/src:
main/java/org/apache/camel/component/mina/
test/java/org/apache/camel/component/mina/
Author: davsclaus
Date: Tue Mar 10 11:18:28 2009
New Revision: 752068
URL: http://svn.apache.org/viewvc?rev=752068&view=rev
Log:
CAMEL-1444: Potential problem with mina udp using the same camel context as both client and server.
Added:
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java (with props)
Modified:
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpLineDelimiterUsingPlainSocketTest.java
camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java
camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Tue Mar 10 11:18:28 2009
@@ -19,34 +19,25 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
-import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
-import java.nio.charset.CharsetEncoder;
import java.util.List;
import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.ExchangePattern;
-import org.apache.camel.NoTypeConversionAvailableException;
-import org.apache.camel.util.ObjectHelper;
import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.DefaultIoFilterChainBuilder;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoFilter;
import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSession;
import org.apache.mina.filter.LoggingFilter;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.filter.codec.ProtocolEncoder;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.transport.socket.nio.DatagramAcceptor;
@@ -294,48 +285,8 @@
ProtocolCodecFactory codecFactory = configuration.getCodec();
if (codecFactory == null) {
final Charset charset = getEncodingParameter(type, configuration);
-
- // set the encoder used for this datagram codec factory
- codecFactory = new ProtocolCodecFactory() {
- public ProtocolEncoder getEncoder() throws Exception {
- return new ProtocolEncoder() {
- private CharsetEncoder encoder;
-
- public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
- if (encoder == null) {
- encoder = charset.newEncoder();
- }
- ByteBuffer buf = toByteBuffer(message, encoder);
- buf.flip();
- out.write(buf);
- }
-
- public void dispose(IoSession session) throws Exception {
- // do nothing
- }
- };
- }
-
- public ProtocolDecoder getDecoder() throws Exception {
- return new ProtocolDecoder() {
- public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
- // must acquire the bytebuffer since we just pass it below instead of creating a new one (CAMEL-257)
- in.acquire();
-
- // lets just pass the ByteBuffer in
- out.write(in);
- }
-
- public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
- // do nothing
- }
-
- public void dispose(IoSession session) throws Exception {
- // do nothing
- }
- };
- }
- };
+
+ codecFactory = new MinaUdpProtocolCodecFactory(getCamelContext(), charset);
if (LOG.isDebugEnabled()) {
LOG.debug(type + ": Using CodecFactory: " + codecFactory + " using encoding: " + charset);
@@ -345,18 +296,6 @@
addCodecFactory(config, codecFactory);
}
- private ByteBuffer toByteBuffer(Object message, CharsetEncoder encoder) throws CharacterCodingException {
- ByteBuffer answer;
- try {
- answer = convertTo(ByteBuffer.class, message);
- } catch (NoTypeConversionAvailableException e) {
- String value = convertTo(String.class, message);
- answer = ByteBuffer.allocate(value.length()).setAutoExpand(true);
- answer.putString(value, encoder);
- }
- return answer;
- }
-
private void addCodecFactory(IoServiceConfig config, ProtocolCodecFactory codecFactory) {
config.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));
}
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Tue Mar 10 11:18:28 2009
@@ -92,8 +92,14 @@
@Override
public void messageReceived(IoSession session, Object object) throws Exception {
+ // log what we received
if (LOG.isDebugEnabled()) {
- LOG.debug("Received body: " + object);
+ Object in = object;
+ if (in instanceof byte[]) {
+ // byte arrays is not readable so convert to string
+ in = endpoint.getCamelContext().getTypeConverter().convertTo(String.class, in);
+ }
+ LOG.debug("Received body: " + in);
}
Exchange exchange = endpoint.createExchange(session, object);
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConverter.java Tue Mar 10 11:18:28 2009
@@ -32,18 +32,14 @@
*/
@Converter
public final class MinaConverter {
+
private MinaConverter() {
//Utility Class
}
+
@Converter
public static byte[] toByteArray(ByteBuffer buffer) {
byte[] answer = new byte[buffer.remaining()];
- try {
- // must acquire the Byte buffer to avoid release if more than twice
- buffer.acquire();
- } catch (IllegalStateException ex) {
- // catch the exception if we acquire the buffer which is already released.
- }
buffer.get(answer);
return answer;
}
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Tue Mar 10 11:18:28 2009
@@ -86,10 +86,16 @@
handler.reset();
}
- // write the body
+ // log what we are writing
if (LOG.isDebugEnabled()) {
- LOG.debug("Writing body: " + body);
+ Object out = body;
+ if (body instanceof byte[]) {
+ // byte arrays is not readable so convert to string
+ out = exchange.getContext().getTypeConverter().convertTo(String.class, body);
+ }
+ LOG.debug("Writing body : " + out);
}
+ // write the body
MinaHelper.writeBody(session, body, exchange);
if (sync) {
Added: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java?rev=752068&view=auto
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java (added)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java Tue Mar 10 11:18:28 2009
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+
+/**
+ * @version $Revision$
+ */
+public class MinaUdpProtocolCodecFactory implements ProtocolCodecFactory {
+
+ private final Charset charset;
+ private final CamelContext context;
+
+ public MinaUdpProtocolCodecFactory(CamelContext context, Charset charset) {
+ this.context = context;
+ this.charset = charset;
+ }
+
+ public ProtocolEncoder getEncoder() throws Exception {
+ return new ProtocolEncoder() {
+ private CharsetEncoder encoder;
+
+ public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
+ if (encoder == null) {
+ encoder = charset.newEncoder();
+ }
+ ByteBuffer buf = toByteBuffer(message, encoder);
+ buf.flip();
+ out.write(buf);
+ }
+
+ public void dispose(IoSession session) throws Exception {
+ // do nothing
+ }
+ };
+ }
+
+ public ProtocolDecoder getDecoder() throws Exception {
+ return new ProtocolDecoder() {
+ public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
+ // convert to bytes to write, we can not pass in the byte buffer as it could be sent to
+ // multiple mina sessions so we must convert it to bytes
+ byte[] bytes = context.getTypeConverter().convertTo(byte[].class, in);
+ out.write(bytes);
+ }
+
+ public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
+ // do nothing
+ }
+
+ public void dispose(IoSession session) throws Exception {
+ // do nothing
+ }
+ };
+ }
+
+ private ByteBuffer toByteBuffer(Object message, CharsetEncoder encoder) throws CharacterCodingException {
+ ByteBuffer answer;
+ try {
+ answer = context.getTypeConverter().convertTo(ByteBuffer.class, message);
+ } catch (NoTypeConversionAvailableException e) {
+ String value = context.getTypeConverter().convertTo(String.class, message);
+ answer = ByteBuffer.allocate(value.length()).setAutoExpand(true);
+ answer.putString(value, encoder);
+ }
+ return answer;
+ }
+
+
+}
Propchange: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaUdpProtocolCodecFactory.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpLineDelimiterUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpLineDelimiterUsingPlainSocketTest.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpLineDelimiterUsingPlainSocketTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpLineDelimiterUsingPlainSocketTest.java Tue Mar 10 11:18:28 2009
@@ -123,6 +123,9 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
+ // use no delay for fast unit testing
+ errorHandler(deadLetterChannel().maximumRedeliveries(2).delay(0));
+
from(uri).process(new Processor() {
public void process(Exchange e) {
String in = e.getIn().getBody(String.class);
Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithIoOutProcessorExceptionTest.java Tue Mar 10 11:18:28 2009
@@ -22,7 +22,6 @@
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.DefaultExchange;
/**
* To unit test CAMEL-364.
@@ -48,6 +47,9 @@
public void configure() {
from(uri).process(new Processor() {
public void process(Exchange e) {
+ // use no delay for fast unit testing
+ errorHandler(deadLetterChannel().maximumRedeliveries(2).delay(0));
+
assertEquals("Hello World", e.getIn().getBody(String.class));
// simulate a problem processing the input to see if we can handle it properly
throw new IllegalArgumentException("Forced exception");
Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java?rev=752068&r1=752067&r2=752068&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaUdpWithInOutUsingPlainSocketTest.java Tue Mar 10 11:18:28 2009
@@ -26,7 +26,6 @@
import org.apache.camel.builder.RouteBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.mina.common.ByteBuffer;
/**
* To test InOut exchange for the UDP protocol.
@@ -49,14 +48,14 @@
byte[] data = input.getBytes();
DatagramPacket packet = new DatagramPacket(data, data.length, address, PORT);
- LOG.debug("Sending data");
+ LOG.debug("+++ Sending data +++");
socket.send(packet);
Thread.sleep(1000);
byte[] buf = new byte[128];
DatagramPacket receive = new DatagramPacket(buf, buf.length, address, PORT);
- LOG.debug("Receving data");
+ LOG.debug("+++ Receving data +++");
socket.receive(receive);
socket.close();
@@ -69,8 +68,7 @@
public void configure() {
from("mina:udp://127.0.0.1:" + PORT + "?sync=true").process(new Processor() {
public void process(Exchange exchange) throws Exception {
- ByteBuffer in = exchange.getIn().getBody(ByteBuffer.class);
- String s = MinaConverter.toString(in, exchange);
+ String s = exchange.getIn().getBody(String.class);
exchange.getOut().setBody("Hello " + s);
}
});