You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/02/28 09:01:09 UTC
svn commit: r631882 - in /activemq/camel/trunk/components/camel-mina/src:
main/java/org/apache/camel/component/mina/
test/java/org/apache/camel/component/mina/
Author: ningjiang
Date: Thu Feb 28 00:01:08 2008
New Revision: 631882
URL: http://svn.apache.org/viewvc?rev=631882&view=rev
Log:
CAMEL-340 applied the patch with thanks to Claus
Added:
activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java (with props)
Modified:
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?rev=631882&r1=631881&r2=631882&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Thu Feb 28 00:01:08 2008
@@ -27,6 +27,7 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.converter.ObjectConverter;
import org.apache.camel.impl.DefaultComponent;
import org.apache.mina.common.ByteBuffer;
@@ -53,6 +54,7 @@
import org.apache.mina.transport.vmpipe.VmPipeConnector;
/**
+ * The component for using the Mina libaray
* @version $Revision$
*/
public class MinaComponent extends DefaultComponent<MinaExchange> {
@@ -99,7 +101,16 @@
// TODO customize the config via URI
SocketConnectorConfig config = new SocketConnectorConfig();
configureSocketCodecFactory(config, parameters);
- return new MinaEndpoint(uri, this, address, acceptor, connector, config);
+ MinaEndpoint endpoint = new MinaEndpoint(uri, this, address, acceptor, connector, config);
+
+ boolean sync = ObjectConverter.toBool(parameters.get("sync"));
+ if (sync) {
+ endpoint.setExchangePattern(ExchangePattern.InOut);
+ } else {
+ endpoint.setExchangePattern(ExchangePattern.InOnly);
+ }
+
+ return endpoint;
}
protected void configureSocketCodecFactory(BaseIoConnectorConfig config, Map parameters) {
Modified: activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=631882&r1=631881&r2=631882&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ activemq/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Thu Feb 28 00:01:08 2008
@@ -16,10 +16,9 @@
*/
package org.apache.camel.component.mina;
-import java.net.SocketAddress;
-
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.common.IoAcceptor;
@@ -27,10 +26,10 @@
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import java.net.SocketAddress;
+
/**
- * A
- *
- * @{link Consumer} for MINA
+ * A @{link Consumer} implementation for MINA
* @version $Revision$
*/
public class MinaConsumer extends DefaultConsumer<MinaExchange> {
@@ -57,8 +56,24 @@
IoHandler handler = new IoHandlerAdapter() {
@Override
public void messageReceived(IoSession session, Object object) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received body: " + object);
+ }
+
MinaExchange exchange = endpoint.createExchange(session, object);
getProcessor().process(exchange);
+
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ Object body = exchange.getOut().getBody();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing body: " + body);
+ }
+ session.write(body);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Can not write body since this exchange is not out capable: " + exchange);
+ }
+ }
}
};
Added: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java?rev=631882&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java (added)
+++ activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java Thu Feb 28 00:01:08 2008
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+/**
+ * @version $Revision$
+ */
+public class MinaTcpWithInOutUsingPlainSocketTest extends TestCase {
+
+ protected CamelContext container = new DefaultCamelContext();
+ // use parameter sync=true to force InOut pattern of the MinaExchange
+ protected String uri = "mina:tcp://localhost:8888?textline=true&sync=true";
+
+ public void testSendAndReceiveOnce() throws Exception {
+ String response = sendAndReceive("World");
+
+ assertNotNull("Nothing received from Mina", response);
+ assertEquals("Hello World", response);
+ }
+
+ public void testSendAndReceiveTwice() throws Exception {
+ String london = sendAndReceive("London");
+ String paris = sendAndReceive("Paris");
+
+ assertNotNull("Nothing received from Mina", london);
+ assertNotNull("Nothing received from Mina", paris);
+ assertEquals("Hello London", london);
+ assertEquals("Hello Paris", paris);
+ }
+
+ private String sendAndReceive(String input) throws IOException {
+ byte buf[] = new byte[128];
+
+ Socket soc = new Socket();
+ soc.connect(new InetSocketAddress("localhost", 8888));
+
+ // Send message using plain Socket to test if this works
+ OutputStream os = null;
+ InputStream is = null;
+ try {
+ os = soc.getOutputStream();
+ // must append newline at the end to flag end of textline to Camel-Mina
+ os.write((input + "\n").getBytes());
+
+ is = soc.getInputStream();
+ is.read(buf);
+ } finally {
+ is.close();
+ os.close();
+ soc.close();
+ }
+
+ // convert the buffer to chars
+ StringBuffer sb = new StringBuffer();
+ for (byte b : buf) {
+ char ch = (char) b;
+ if (ch == '\n' || b == 0) {
+ // newline denotes end of text (added in the end in the processor below)
+ break;
+ } else {
+ sb.append(ch);
+ }
+ }
+
+ return sb.toString();
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ container.addRoutes(createRouteBuilder());
+ container.start();
+ }
+
+
+ @Override
+ protected void tearDown() throws Exception {
+ container.stop();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from(uri).process(new Processor() {
+ public void process(Exchange e) {
+ String in = e.getIn().getBody(String.class);
+ // append newline at end to denote end of data for textline codec
+ e.getOut().setBody("Hello " + in + "\n");
+ }
+ });
+ }
+ };
+ }
+
+}
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTcpWithInOutUsingPlainSocketTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date