You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/02/28 09:01:09 UTC

svn commit: r631882 - in /activemq/camel/trunk/components/camel-mina/src: main/java/org/apache/camel/component/mina/ test/java/org/apache/camel/component/mina/

Author: ningjiang
Date: Thu Feb 28 00:01:08 2008
New Revision: 631882

URL: http://svn.apache.org/viewvc?rev=631882&view=rev
Log:
CAMEL-340 applied the patch with thanks to Claus

Added:
    activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java   (with props)
Modified:
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
    activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=631882&r1=631881&r2=631882&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Thu Feb 28 00:01:08 2008
@@ -27,6 +27,7 @@
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.converter.ObjectConverter;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.mina.common.ByteBuffer;
@@ -53,6 +54,7 @@
 import org.apache.mina.transport.vmpipe.VmPipeConnector;
 
 /**
+ * The component for using the Mina libaray
  * @version $Revision$
  */
 public class MinaComponent extends DefaultComponent<MinaExchange> {
@@ -99,7 +101,16 @@
         // TODO customize the config via URI
         SocketConnectorConfig config = new SocketConnectorConfig();
         configureSocketCodecFactory(config, parameters);
-        return new MinaEndpoint(uri, this, address, acceptor, connector, config);
+        MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, connector, config);
+
+        boolean sync = ObjectConverter.toBool(parameters.get("sync"));
+        if (sync) {
+            endpoint.setExchangePattern(ExchangePattern.InOut);
+        } else {
+            endpoint.setExchangePattern(ExchangePattern.InOnly);
+        }
+
+        return endpoint;
     }
 
     protected void configureSocketCodecFactory(BaseIoConnectorConfig config, Map parameters) {

Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=631882&r1=631881&r2=631882&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Thu Feb 28 00:01:08 2008
@@ -16,10 +16,9 @@
  */
 package org.apache.camel.component.mina;
 
-import java.net.SocketAddress;
-
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.mina.common.IoAcceptor;
@@ -27,10 +26,10 @@
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
 
+import java.net.SocketAddress;
+
 /**
- * A
- * 
- * @{link Consumer} for MINA
+ * A @{link Consumer} implementation for MINA
  * @version $Revision$
  */
 public class MinaConsumer extends DefaultConsumer<MinaExchange> {
@@ -57,8 +56,24 @@
         IoHandler handler = new IoHandlerAdapter() {
             @Override
             public void messageReceived(IoSession session, Object object) throws Exception {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Received body: " + object);
+                }
+
                 MinaExchange exchange = endpoint.createExchange(session, object);
                 getProcessor().process(exchange);
+
+                if (ExchangeHelper.isOutCapable(exchange)) {
+                    Object body = exchange.getOut().getBody();
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Writing body: " + body);
+                    }
+                    session.write(body);
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Can not write body since this exchange is not out capable: " + exchange);
+                    }
+                }
             }
         };
 

Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java?rev=631882&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java Thu Feb 28 00:01:08 2008
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+/**
+ * @version $Revision$
+ */
+public class MinaTcpWithInOutUsingPlainSocketTest extends TestCase {
+
+    protected CamelContext container = new DefaultCamelContext();
+    // use parameter sync=true to force InOut pattern of the MinaExchange 
+    protected String uri = "mina:tcp://localhost:8888?textline=true&sync=true";
+
+    public void testSendAndReceiveOnce() throws Exception {
+        String response = sendAndReceive("World");
+
+        assertNotNull("Nothing received from Mina", response);
+        assertEquals("Hello World", response);
+    }
+
+    public void testSendAndReceiveTwice() throws Exception {
+        String london = sendAndReceive("London");
+        String paris = sendAndReceive("Paris");
+
+        assertNotNull("Nothing received from Mina", london);
+        assertNotNull("Nothing received from Mina", paris);
+        assertEquals("Hello London", london);
+        assertEquals("Hello Paris", paris);
+    }
+
+    private String sendAndReceive(String input) throws IOException {
+        byte buf[] = new byte[128];
+
+        Socket soc = new Socket();
+        soc.connect(new InetSocketAddress("localhost", 8888));
+
+        // Send message using plain Socket to test if this works
+        OutputStream os = null;
+        InputStream is = null;
+        try {
+            os = soc.getOutputStream();
+            // must append newline at the end to flag end of textline to Camel-Mina
+            os.write((input + "\n").getBytes());
+
+            is = soc.getInputStream();
+            is.read(buf);
+        } finally {
+            is.close();
+            os.close();
+            soc.close();
+        }
+
+        // convert the buffer to chars
+        StringBuffer sb = new StringBuffer();
+        for (byte b : buf) {
+            char ch = (char) b;
+            if (ch == '\n' || b == 0) {
+                // newline denotes end of text (added in the end in the processor below)
+                break;
+            } else {
+                sb.append(ch);
+            }
+        }
+
+        return sb.toString();
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        container.addRoutes(createRouteBuilder());
+        container.start();
+    }
+
+
+    @Override
+    protected void tearDown() throws Exception {
+        container.stop();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from(uri).process(new Processor() {
+                    public void process(Exchange e) {
+                        String in = e.getIn().getBody(String.class);
+                        // append newline at end to denote end of data for textline codec
+                        e.getOut().setBody("Hello " + in + "\n");
+                    }
+                });
+            }
+        };
+    }
+
+}

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date