You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/01/02 17:00:01 UTC
svn commit: r1226449 [1/5] - in /camel/trunk: ./ apache-camel/
apache-camel/src/main/descriptors/ components/ components/camel-mina2/
components/camel-mina2/src/ components/camel-mina2/src/main/
components/camel-mina2/src/main/java/ components/camel-mi...
Author: davsclaus
Date: Mon Jan 2 15:59:58 2012
New Revision: 1226449
URL: http://svn.apache.org/viewvc?rev=1226449&view=rev
Log:
CAMEL-3471: camel-mina2 component. Thanks to Chad Beaulac for the patch.
Added:
camel/trunk/components/camel-mina2/ (with props)
camel/trunk/components/camel-mina2/pom.xml (with props)
camel/trunk/components/camel-mina2/src/
camel/trunk/components/camel-mina2/src/main/
camel/trunk/components/camel-mina2/src/main/java/
camel/trunk/components/camel-mina2/src/main/java/org/
camel/trunk/components/camel-mina2/src/main/java/org/apache/
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java (with props)
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java (with props)
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Constants.java (with props)
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java (with props)
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Converter.java (with props)
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Endpoint.java (with props)
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Helper.java (with props)
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2PayloadHelper.java (with props)
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java (with props)
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2TextLineCodecFactory.java (with props)
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2TextLineDelimiter.java (with props)
camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2UdpProtocolCodecFactory.java (with props)
camel/trunk/components/camel-mina2/src/main/resources/
camel/trunk/components/camel-mina2/src/main/resources/META-INF/
camel/trunk/components/camel-mina2/src/main/resources/META-INF/LICENSE.txt (with props)
camel/trunk/components/camel-mina2/src/main/resources/META-INF/NOTICE.txt (with props)
camel/trunk/components/camel-mina2/src/main/resources/META-INF/services/
camel/trunk/components/camel-mina2/src/main/resources/META-INF/services/org/
camel/trunk/components/camel-mina2/src/main/resources/META-INF/services/org/apache/
camel/trunk/components/camel-mina2/src/main/resources/META-INF/services/org/apache/camel/
camel/trunk/components/camel-mina2/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
camel/trunk/components/camel-mina2/src/main/resources/META-INF/services/org/apache/camel/component/
camel/trunk/components/camel-mina2/src/main/resources/META-INF/services/org/apache/camel/component/mina2
camel/trunk/components/camel-mina2/src/test/
camel/trunk/components/camel-mina2/src/test/data/
camel/trunk/components/camel-mina2/src/test/data/message1.txt (with props)
camel/trunk/components/camel-mina2/src/test/java/
camel/trunk/components/camel-mina2/src/test/java/org/
camel/trunk/components/camel-mina2/src/test/java/org/apache/
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/BaseMina2Test.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/MessageIOSessionTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientServerTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ComponentTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ComponentWithConfigurationTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ConsumerTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ConverterTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeDefaultTimeOutTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FileTcpTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FileUdpTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2FiltersTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOnlyRouteTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOutCloseSessionWhenCompleteTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOutRouteTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOutRouteTextLineDelimiterTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2InOutWithForcedNoResponseTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2LoggerOptionTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2MaxLineLengthTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoDefaultCodecTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerAnotherConcurrentTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerConcurrentTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverserServer.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SendToProcessorTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SpringMinaEndpointTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SpringMinaEndpointUDPTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2SpringMultipleUDPTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpLineDelimiterUsingPlainSocketTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTextlineDelimiterTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpTextlineProtocolTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpWithInOutTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpWithInOutUsingPlainSocketTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TcpWithIoOutProcessorExceptionTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpNoCamelTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpUsingTemplateTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2UdpWithInOutUsingPlainSocketTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMCustomCodecTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMFileTcpTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMTextlineProtocolTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VMTransferExchangeOptionTest.java (with props)
camel/trunk/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2VmTest.java (with props)
camel/trunk/components/camel-mina2/src/test/resources/
camel/trunk/components/camel-mina2/src/test/resources/jndi-example.properties (with props)
camel/trunk/components/camel-mina2/src/test/resources/log4j.properties (with props)
camel/trunk/components/camel-mina2/src/test/resources/org/
camel/trunk/components/camel-mina2/src/test/resources/org/apache/
camel/trunk/components/camel-mina2/src/test/resources/org/apache/camel/
camel/trunk/components/camel-mina2/src/test/resources/org/apache/camel/component/
camel/trunk/components/camel-mina2/src/test/resources/org/apache/camel/component/mina2/
camel/trunk/components/camel-mina2/src/test/resources/org/apache/camel/component/mina2/SpringMinaEndpointTest-context.xml (with props)
camel/trunk/components/camel-mina2/src/test/resources/org/apache/camel/component/mina2/SpringMinaEndpointUDPTest-context.xml (with props)
camel/trunk/components/camel-mina2/src/test/resources/org/apache/camel/component/mina2/SpringMultipleUDPTest-context.xml (with props)
camel/trunk/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelMina2Test.java
- copied, changed from r1226435, camel/trunk/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelMinaTest.java
Modified:
camel/trunk/apache-camel/pom.xml
camel/trunk/apache-camel/src/main/descriptors/common-bin.xml
camel/trunk/components/pom.xml
camel/trunk/parent/pom.xml
camel/trunk/platforms/karaf/features/src/main/resources/features.xml
camel/trunk/pom.xml
Modified: camel/trunk/apache-camel/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/apache-camel/pom.xml?rev=1226449&r1=1226448&r2=1226449&view=diff
==============================================================================
--- camel/trunk/apache-camel/pom.xml (original)
+++ camel/trunk/apache-camel/pom.xml Mon Jan 2 15:59:58 2012
@@ -292,6 +292,10 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-mina2</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-msv</artifactId>
<!--Avoid the WARNING message -->
<exclusions>
Modified: camel/trunk/apache-camel/src/main/descriptors/common-bin.xml
URL: http://svn.apache.org/viewvc/camel/trunk/apache-camel/src/main/descriptors/common-bin.xml?rev=1226449&r1=1226448&r2=1226449&view=diff
==============================================================================
--- camel/trunk/apache-camel/src/main/descriptors/common-bin.xml (original)
+++ camel/trunk/apache-camel/src/main/descriptors/common-bin.xml Mon Jan 2 15:59:58 2012
@@ -94,6 +94,7 @@
<include>org.apache.camel:camel-lucene</include>
<include>org.apache.camel:camel-mail</include>
<include>org.apache.camel:camel-mina</include>
+ <include>org.apache.camel:camel-mina2</include>
<include>org.apache.camel:camel-msv</include>
<include>org.apache.camel:camel-mvel</include>
<include>org.apache.camel:camel-mybatis</include>
Propchange: camel/trunk/components/camel-mina2/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Mon Jan 2 15:59:58 2012
@@ -0,0 +1,16 @@
+.pmd
+.checkstyle
+.ruleset
+target
+.settings
+.classpath
+.project
+.wtpmodules
+prj.el
+.jdee_classpath
+.jdee_sources
+velocity.log
+eclipse-classes
+*.ipr
+*.iml
+*.iws
Added: camel/trunk/components/camel-mina2/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/pom.xml?rev=1226449&view=auto
==============================================================================
--- camel/trunk/components/camel-mina2/pom.xml (added)
+++ camel/trunk/components/camel-mina2/pom.xml Mon Jan 2 15:59:58 2012
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-parent</artifactId>
+ <version>2.10-SNAPSHOT</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>camel-mina2</artifactId>
+ <packaging>bundle</packaging>
+ <name>Camel :: MINA-2</name>
+ <description>Camel MINA 2.x support</description>
+
+ <properties>
+ <camel.osgi.export.pkg>org.apache.camel.component.mina2.*</camel.osgi.export.pkg>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.mina</groupId>
+ <artifactId>mina-core</artifactId>
+ <version>${mina2-version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- testing -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymockclassextension</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <childDelegation>false</childDelegation>
+ <useFile>true</useFile>
+ <forkMode>pertest</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ <excludes>
+ <!-- This test just show the shutdown error -->
+ <exclude>**/Mina2ProducerShutdownTest.*</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <!-- allows the special unittest to be ran via 'mvn compile exec:java' -->
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <configuration>
+ <mainClass>org.apache.camel.component.mina2.Mina2ProducerShutdownTest</mainClass>
+ <includePluginDependencies>false</includePluginDependencies>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Propchange: camel/trunk/components/camel-mina2/pom.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mina2/pom.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: camel/trunk/components/camel-mina2/pom.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml
Added: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java?rev=1226449&view=auto
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java (added)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java Mon Jan 2 15:59:58 2012
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina2;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.mina.core.filterchain.IoFilter;
+
+/**
+ * Component for Apache MINA 2.x.
+ *
+ * @version
+ */
+public class Mina2Component extends DefaultComponent {
+
+ private Mina2Configuration configuration;
+
+ public Mina2Component() {
+ }
+
+ public Mina2Component(CamelContext context) {
+ super(context);
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ // Using the configuration which set by the component as a default one
+ // Since the configuration's properties will be set by the URI
+ // we need to copy or create a new MinaConfiguration here
+ // Using the configuration which set by the component as a default one
+ // Since the configuration's properties will be set by the URI
+ // we need to copy or create a new MinaConfiguration here
+ Mina2Configuration config;
+ if (configuration != null) {
+ config = configuration.copy();
+ } else {
+ config = new Mina2Configuration();
+ }
+
+ URI u = new URI(remaining);
+ config.setHost(u.getHost());
+ config.setPort(u.getPort());
+ config.setProtocol(u.getScheme());
+ config.setFilters(resolveAndRemoveReferenceListParameter(parameters, "filters", IoFilter.class));
+ setProperties(config, parameters);
+
+ return createEndpoint(uri, config);
+ }
+
+ public Endpoint createEndpoint(Mina2Configuration config) throws Exception {
+ return createEndpoint(null, config);
+ }
+
+ private Endpoint createEndpoint(String uri, Mina2Configuration config) throws Exception {
+ ObjectHelper.notNull(getCamelContext(), "camelContext");
+ Endpoint endpoint;
+ String protocol = config.getProtocol();
+ // if mistyped uri then protocol can be null
+
+ if (protocol != null) {
+ if (protocol.equals("tcp") || config.isDatagramProtocol() || protocol.equals("vm")) {
+ return new Mina2Endpoint(uri, this, config);
+ }
+ }
+ // protocol not resolved so error
+ throw new IllegalArgumentException("Unrecognised MINA protocol: " + protocol + " for uri: " + uri);
+ }
+
+ // Properties
+ //-------------------------------------------------------------------------
+ public Mina2Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(Mina2Configuration configuration) {
+ this.configuration = configuration;
+ }
+}
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Component.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java?rev=1226449&view=auto
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java (added)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java Mon Jan 2 15:59:58 2012
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina2;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+
+/**
+ * Mina2 configuration
+ */
+public class Mina2Configuration implements Cloneable {
+
+ private String protocol;
+ private String host;
+ private int port;
+ private boolean sync = true;
+ private boolean textline;
+ private Mina2TextLineDelimiter textlineDelimiter;
+ private ProtocolCodecFactory codec;
+ private String encoding;
+ private long timeout = 3000;
+ private boolean lazySessionCreation = true;
+ private boolean transferExchange;
+ private boolean minaLogger;
+ private int encoderMaxLineLength = -1;
+ private int decoderMaxLineLength = -1;
+ private List<IoFilter> filters;
+ private boolean allowDefaultCodec = true;
+ private boolean disconnect;
+ private boolean disconnectOnNoReply = true;
+ private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
+
+ /**
+ * Returns a copy of this configuration
+ */
+ public Mina2Configuration copy() {
+ try {
+ return (Mina2Configuration) clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+
+ public String getCharsetName() {
+ if (encoding == null) {
+ return null;
+ }
+ if (!Charset.isSupported(encoding)) {
+ throw new IllegalArgumentException("The encoding: " + encoding + " is not supported");
+ }
+
+ return Charset.forName(encoding).name();
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public void setProtocol(String protocol) {
+ this.protocol = protocol;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public boolean isSync() {
+ return sync;
+ }
+
+ public void setSync(boolean sync) {
+ this.sync = sync;
+ }
+
+ public boolean isTextline() {
+ return textline;
+ }
+
+ public void setTextline(boolean textline) {
+ this.textline = textline;
+ }
+
+ public Mina2TextLineDelimiter getTextlineDelimiter() {
+ return textlineDelimiter;
+ }
+
+ public void setTextlineDelimiter(Mina2TextLineDelimiter textlineDelimiter) {
+ this.textlineDelimiter = textlineDelimiter;
+ }
+
+ public ProtocolCodecFactory getCodec() {
+ return codec;
+ }
+
+ public void setCodec(ProtocolCodecFactory codec) {
+ this.codec = codec;
+ }
+
+ public String getEncoding() {
+ return encoding;
+ }
+
+ public void setEncoding(String encoding) {
+ this.encoding = encoding;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ public boolean isLazySessionCreation() {
+ return lazySessionCreation;
+ }
+
+ public void setLazySessionCreation(boolean lazySessionCreation) {
+ this.lazySessionCreation = lazySessionCreation;
+ }
+
+ public boolean isTransferExchange() {
+ return transferExchange;
+ }
+
+ public void setTransferExchange(boolean transferExchange) {
+ this.transferExchange = transferExchange;
+ }
+
+ public void setEncoderMaxLineLength(int encoderMaxLineLength) {
+ this.encoderMaxLineLength = encoderMaxLineLength;
+ }
+
+ public int getEncoderMaxLineLength() {
+ return encoderMaxLineLength;
+ }
+
+ public void setDecoderMaxLineLength(int decoderMaxLineLength) {
+ this.decoderMaxLineLength = decoderMaxLineLength;
+ }
+
+ public int getDecoderMaxLineLength() {
+ return decoderMaxLineLength;
+ }
+
+ public boolean isMinaLogger() {
+ return minaLogger;
+ }
+
+ public void setMinaLogger(boolean minaLogger) {
+ this.minaLogger = minaLogger;
+ }
+
+ public List<IoFilter> getFilters() {
+ return filters;
+ }
+
+ public void setFilters(List<IoFilter> filters) {
+ this.filters = filters;
+ }
+
+ public boolean isDatagramProtocol() {
+ return protocol.equals("udp");
+ }
+
+ public void setAllowDefaultCodec(boolean allowDefaultCodec) {
+ this.allowDefaultCodec = allowDefaultCodec;
+ }
+
+ public boolean isAllowDefaultCodec() {
+ return allowDefaultCodec;
+ }
+
+ public boolean isDisconnect() {
+ return disconnect;
+ }
+
+ public void setDisconnect(boolean disconnect) {
+ this.disconnect = disconnect;
+ }
+
+ public boolean isDisconnectOnNoReply() {
+ return disconnectOnNoReply;
+ }
+
+ public void setDisconnectOnNoReply(boolean disconnectOnNoReply) {
+ this.disconnectOnNoReply = disconnectOnNoReply;
+ }
+
+ public LoggingLevel getNoReplyLogLevel() {
+ return noReplyLogLevel;
+ }
+
+ public void setNoReplyLogLevel(LoggingLevel noReplyLogLevel) {
+ this.noReplyLogLevel = noReplyLogLevel;
+ }
+}
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Constants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Constants.java?rev=1226449&view=auto
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Constants.java (added)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Constants.java Mon Jan 2 15:59:58 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina2;
+
+/**
+ * Mina constants
+ *
+ * @version
+ */
+public final class Mina2Constants {
+
+ public static final transient String MINA_CLOSE_SESSION_WHEN_COMPLETE = "CamelMina2CloseSessionWhenComplete";
+ /** The key of the IoSession which is stored in the message header*/
+ public static final transient String MINA_IOSESSION = "CamelMina2IoSession";
+ /** The socket address of local machine that received the message. */
+ public static final transient String MINA_LOCAL_ADDRESS = "CamelMina2LocalAddress";
+ /** The socket address of the remote machine that send the message. */
+ public static final transient String MINA_REMOTE_ADDRESS = "CamelMina2RemoteAddress";
+
+ private Mina2Constants() {
+ // Utility class
+ }
+}
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Constants.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Constants.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java?rev=1226449&view=auto
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java (added)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java Mon Jan 2 15:59:58 2012
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina2;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.IOHelper;
+import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.service.IoAcceptor;
+import org.apache.mina.core.service.IoHandler;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.service.IoService;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.session.IoSessionConfig;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
+import org.apache.mina.filter.codec.textline.LineDelimiter;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
+import org.apache.mina.transport.socket.nio.NioProcessor;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.camel.Consumer Consumer} implementation for Apache MINA.
+ *
+ * @version
+ */
+public class Mina2Consumer extends DefaultConsumer {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(Mina2Consumer.class);
+ private SocketAddress address;
+ private IoAcceptor acceptor;
+ private CamelLogger noReplyLogger;
+ private Mina2Configuration configuration;
+ private IoSessionConfig acceptorConfig;
+ private boolean sync;
+
+ public Mina2Consumer(final Mina2Endpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ this.configuration = endpoint.getConfiguration();
+ this.noReplyLogger = new CamelLogger(LOG, configuration.getNoReplyLogLevel());
+ //
+ // All mina2 endpoints are InOut. The endpoints are asynchronous.
+ // Endpoints can send "n" messages and receive "m" messages.
+ //
+ this.getEndpoint().setExchangePattern(ExchangePattern.InOut);
+
+ String protocol = configuration.getProtocol();
+ if (protocol.equals("tcp")) {
+ createSocketEndpoint(protocol, configuration);
+ } else if (configuration.isDatagramProtocol()) {
+ createDatagramEndpoint(protocol, configuration);
+ } else if (protocol.equals("vm")) {
+ createVmEndpoint(protocol, configuration);
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ IoHandler handler = new ReceiveHandler();
+ acceptor.setHandler(handler);
+ acceptor.bind(address);
+ LOG.info("Bound to server address: {} using acceptor: {}", address, acceptor);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ LOG.info("Unbinding from server address: {} using acceptor: {}", address, acceptor);
+ acceptor.unbind(address);
+ super.doStop();
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+ protected void createVmEndpoint(String uri, Mina2Configuration configuration) {
+
+ boolean minaLogger = configuration.isMinaLogger();
+ List<IoFilter> filters = configuration.getFilters();
+
+ address = new VmPipeAddress(configuration.getPort());
+ acceptor = new VmPipeAcceptor();
+
+ // acceptor connectorConfig
+ configureCodecFactory("Mina2Consumer", acceptor, configuration);
+ if (minaLogger) {
+ acceptor.getFilterChain().addLast("logger", new LoggingFilter());
+ }
+ appendIoFiltersToChain(filters, acceptor.getFilterChain());
+ }
+
+ protected void createSocketEndpoint(String uri, Mina2Configuration configuration) {
+ LOG.debug("createSocketEndpoint");
+ boolean minaLogger = configuration.isMinaLogger();
+ long timeout = configuration.getTimeout();
+ List<IoFilter> filters = configuration.getFilters();
+
+ address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
+
+ acceptor = new NioSocketAcceptor(
+ new NioProcessor(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaSocketAcceptor")));
+
+ // acceptor connectorConfig
+ acceptorConfig = acceptor.getSessionConfig();
+ configureCodecFactory("Mina2Consumer", acceptor, configuration);
+ ((NioSocketAcceptor) acceptor).setReuseAddress(true);
+ acceptor.setCloseOnDeactivation(true);
+ acceptor.getFilterChain().addLast("threadPool",
+ new ExecutorFilter(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+ if (minaLogger) {
+ acceptor.getFilterChain().addLast("logger", new LoggingFilter());
+ }
+ appendIoFiltersToChain(filters, acceptor.getFilterChain());
+ }
+
+ protected void configureCodecFactory(String type, IoService service, Mina2Configuration configuration) {
+ if (configuration.getCodec() != null) {
+ addCodecFactory(service, configuration.getCodec());
+ } else if (configuration.isAllowDefaultCodec()) {
+ configureDefaultCodecFactory(type, service, configuration);
+ }
+ }
+
+ protected void configureDefaultCodecFactory(String type, IoService service, Mina2Configuration configuration) {
+ if (configuration.isTextline()) {
+ Charset charset = getEncodingParameter(type, configuration);
+ LineDelimiter delimiter = getLineDelimiterParameter(configuration.getTextlineDelimiter());
+ Mina2TextLineCodecFactory codecFactory = new Mina2TextLineCodecFactory(charset, delimiter);
+ if (configuration.getEncoderMaxLineLength() > 0) {
+ codecFactory.setEncoderMaxLineLength(configuration.getEncoderMaxLineLength());
+ }
+ if (configuration.getDecoderMaxLineLength() > 0) {
+ codecFactory.setDecoderMaxLineLength(configuration.getDecoderMaxLineLength());
+ }
+ addCodecFactory(service, codecFactory);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} line delimiter: {}({})",
+ new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter});
+ LOG.debug("Encoder maximum line length: {}. Decoder maximum line length: {}",
+ codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength());
+ }
+ } else {
+ ObjectSerializationCodecFactory codecFactory = new ObjectSerializationCodecFactory();
+ addCodecFactory(service, codecFactory);
+ LOG.debug("{}: Using ObjectSerializationCodecFactory: {}", type, codecFactory);
+ }
+
+ }
+
+ protected void createDatagramEndpoint(String uri, Mina2Configuration configuration) {
+ boolean minaLogger = configuration.isMinaLogger();
+ long timeout = configuration.getTimeout();
+ List<IoFilter> filters = configuration.getFilters();
+
+ address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
+ acceptor = new NioDatagramAcceptor(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramAcceptor"));
+
+ // acceptor connectorConfig
+ acceptorConfig = acceptor.getSessionConfig();
+ configureDataGramCodecFactory("MinaConsumer", acceptor, configuration);
+ acceptor.setCloseOnDeactivation(true);
+ // reuse address is default true for datagram
+ //acceptor.getFilterChain().addLast("threadPool",
+ // new ExecutorFilter(this.getEndpoint().getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "MinaThreadPool")));
+ if (minaLogger) {
+ acceptor.getFilterChain().addLast("logger", new LoggingFilter());
+ }
+ appendIoFiltersToChain(filters, acceptor.getFilterChain());
+ }
+
+ /**
+ * For datagrams the entire message is available as a single IoBuffer so lets just pass those around by default
+ * and try converting whatever they payload is into IoBuffer unless some custom converter is specified
+ */
+ protected void configureDataGramCodecFactory(final String type, final IoService service, final Mina2Configuration configuration) {
+ ProtocolCodecFactory codecFactory = configuration.getCodec();
+ if (codecFactory == null) {
+ final Charset charset = getEncodingParameter(type, configuration);
+
+ codecFactory = new Mina2UdpProtocolCodecFactory(this.getEndpoint().getCamelContext(), charset);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Using CodecFactory: {} using encoding: {}", new Object[]{type, codecFactory, charset});
+ }
+ }
+
+ addCodecFactory(service, codecFactory);
+ }
+
+ private void addCodecFactory(IoService service, ProtocolCodecFactory codecFactory) {
+ service.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));
+ }
+
+ private static LineDelimiter getLineDelimiterParameter(Mina2TextLineDelimiter delimiter) {
+ if (delimiter == null) {
+ return LineDelimiter.DEFAULT;
+ }
+
+ switch (delimiter) {
+ case DEFAULT:
+ return LineDelimiter.DEFAULT;
+ case AUTO:
+ return LineDelimiter.AUTO;
+ case UNIX:
+ return LineDelimiter.UNIX;
+ case WINDOWS:
+ return LineDelimiter.WINDOWS;
+ case MAC:
+ return LineDelimiter.MAC;
+ default:
+ throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter);
+ }
+ }
+
+ private Charset getEncodingParameter(String type, Mina2Configuration configuration) {
+ String encoding = configuration.getEncoding();
+ if (encoding == null) {
+ encoding = Charset.defaultCharset().name();
+ // set in on configuration so its updated
+ configuration.setEncoding(encoding);
+ LOG.debug("{}: No encoding parameter using default charset: {}", type, encoding);
+ }
+ if (!Charset.isSupported(encoding)) {
+ throw new IllegalArgumentException("The encoding: " + encoding + " is not supported");
+ }
+
+ return Charset.forName(encoding);
+ }
+
+ private void appendIoFiltersToChain(List<IoFilter> filters, DefaultIoFilterChainBuilder filterChain) {
+ if (filters != null && filters.size() > 0) {
+ for (IoFilter ioFilter : filters) {
+ filterChain.addLast(ioFilter.getClass().getCanonicalName(), ioFilter);
+ }
+ }
+ }
+
+ @Override
+ public Mina2Endpoint getEndpoint() {
+ return (Mina2Endpoint) super.getEndpoint();
+ }
+
+ public IoAcceptor getAcceptor() {
+ return acceptor;
+ }
+
+ public void setAcceptor(IoAcceptor acceptor) {
+ this.acceptor = acceptor;
+ }
+
+ /**
+ * Handles consuming messages and replying if the exchange is out capable.
+ */
+ private final class ReceiveHandler extends IoHandlerAdapter {
+
+ @Override
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+ // close invalid session
+ if (session != null) {
+ LOG.warn("Closing session as an exception was thrown from MINA");
+ session.close(true);
+ }
+
+ // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
+ throw new CamelException(cause);
+ }
+
+ @Override
+ public void messageReceived(IoSession session, Object object) throws Exception {
+ // log what we received
+ if (LOG.isDebugEnabled()) {
+ Object in = object;
+ if (in instanceof byte[]) {
+ // byte arrays is not readable so convert to string
+ in = getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, in);
+ }
+ LOG.debug("Received body: {}", in);
+ }
+
+ Exchange exchange = getEndpoint().createExchange(session, object);
+ //Set the exchange charset property for converting
+ if (getEndpoint().getConfiguration().getCharsetName() != null) {
+ exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(getEndpoint().getConfiguration().getCharsetName()));
+ }
+
+ try {
+ getProcessor().process(exchange);
+ } catch (Throwable e) {
+ getExceptionHandler().handleException(e);
+ }
+
+ //
+ // If there's a response to send, send it.
+ //
+ boolean disconnect = getEndpoint().getConfiguration().isDisconnect();
+ Object response = null;
+ response = Mina2PayloadHelper.getOut(getEndpoint(), exchange);
+
+ boolean failed = exchange.isFailed();
+ if (failed && !getEndpoint().getConfiguration().isTransferExchange()) {
+ if (exchange.getException() != null) {
+ response = exchange.getException();
+ } else {
+ // failed and no exception, must be a fault
+ response = exchange.getOut().getBody();
+ }
+ }
+
+ if (response != null) {
+ LOG.debug("Writing body: {}", response);
+ Mina2Helper.writeBody(session, response, exchange);
+ } else {
+ LOG.debug("Writing no response");
+ disconnect = Boolean.TRUE;
+ }
+
+ // should session be closed after complete?
+ Boolean close;
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ close = exchange.getOut().getHeader(Mina2Constants.MINA_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class);
+ } else {
+ close = exchange.getIn().getHeader(Mina2Constants.MINA_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class);
+ }
+
+ // should we disconnect, the header can override the configuration
+ if (close != null) {
+ disconnect = close;
+ }
+ if (disconnect) {
+ LOG.debug("Closing session when complete at address: {}", address);
+ session.close(true);
+ }
+ }
+ }
+}
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Converter.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Converter.java?rev=1226449&view=auto
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Converter.java (added)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Converter.java Mon Jan 2 15:59:58 2012
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.mina.core.buffer.IoBuffer;
+
+/**
+ * A set of converter methods for working with MINA2 types
+ *
+ * @version
+ */
+@Converter
+public final class Mina2Converter {
+
+ private Mina2Converter() {
+ //Utility Class
+ }
+
+ @Converter
+ public static byte[] toByteArray(IoBuffer buffer) {
+ byte[] answer = new byte[buffer.remaining()];
+ buffer.get(answer);
+ return answer;
+ // we should not mark and reset the buffer with mina2
+ }
+
+ @Converter
+ public static String toString(IoBuffer buffer, Exchange exchange) {
+ byte[] bytes = toByteArray(buffer);
+ // use type converter as it can handle encoding set on the Exchange
+ return exchange.getContext().getTypeConverter().convertTo(String.class, exchange, bytes);
+ }
+
+ @Converter
+ public static InputStream toInputStream(IoBuffer buffer) {
+ return buffer.asInputStream();
+ }
+
+ @Converter
+ public static ObjectInput toObjectInput(IoBuffer buffer) throws IOException {
+ InputStream is = toInputStream(buffer);
+ return new ObjectInputStream(is);
+ }
+
+ @Converter
+ public static IoBuffer toIoBuffer(byte[] bytes) {
+ IoBuffer buf = IoBuffer.allocate(bytes.length);
+ buf.put(bytes);
+ return buf;
+ }
+}
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Converter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Converter.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Endpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Endpoint.java?rev=1226449&view=auto
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Endpoint.java (added)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Endpoint.java Mon Jan 2 15:59:58 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina2;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.MultipleConsumersSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.mina.core.session.IoSession;
+
+/**
+ * Endpoint for Camel MINA.
+ *
+ * @version
+ */
+public class Mina2Endpoint extends DefaultEndpoint implements MultipleConsumersSupport {
+
+ private Mina2Configuration configuration;
+
+ public Mina2Endpoint() {
+ }
+
+ public Mina2Endpoint(String endpointUri, Component component, Mina2Configuration configuration) {
+ super(endpointUri, component);
+ this.configuration = configuration;
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ ObjectHelper.notNull(configuration, "configuration");
+ return new Mina2Producer(this);
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ ObjectHelper.notNull(configuration, "configuration");
+ return new Mina2Consumer(this, processor);
+ }
+
+ public Exchange createExchange(IoSession session, Object payload) {
+ Exchange exchange = createExchange();
+ exchange.getIn().setHeader(Mina2Constants.MINA_IOSESSION, session);
+ exchange.getIn().setHeader(Mina2Constants.MINA_LOCAL_ADDRESS, session.getLocalAddress());
+ exchange.getIn().setHeader(Mina2Constants.MINA_REMOTE_ADDRESS, session.getRemoteAddress());
+ Mina2PayloadHelper.setIn(exchange, payload);
+ return exchange;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ public boolean isMultipleConsumersSupported() {
+ // only datagram should allow multiple consumers
+ return configuration.isDatagramProtocol();
+ }
+
+ // Properties
+ // -------------------------------------------------------------------------
+ public Mina2Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(Mina2Configuration configuration) {
+ this.configuration = configuration;
+ }
+}
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Endpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Endpoint.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Helper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Helper.java?rev=1226449&view=auto
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Helper.java (added)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Helper.java Mon Jan 2 15:59:58 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina2;
+
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.mina.core.future.WriteFuture;
+import org.apache.mina.core.session.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class used internally by camel-mina using Apache MINA.
+ */
+public final class Mina2Helper {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(Mina2Helper.class);
+
+ private Mina2Helper() {
+ //Utility Class
+ }
+
+ /**
+ * Writes the given body to MINA session. Will wait until the body has been written.
+ *
+ * @param session the MINA session
+ * @param body the body to write (send)
+ * @param exchange the exchange
+ * @throws CamelExchangeException is thrown if the body could not be written for some reasons
+ * (eg remote connection is closed etc.)
+ */
+ public static void writeBody(IoSession session, Object body, Exchange exchange) throws CamelExchangeException {
+ LOG.trace("write exchange [{}] with body [{}]", exchange, body);
+ // the write operation is asynchronous
+ session.write(body);
+ }
+}
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Helper.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Helper.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2PayloadHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2PayloadHelper.java?rev=1226449&view=auto
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2PayloadHelper.java (added)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2PayloadHelper.java Mon Jan 2 15:59:58 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina2;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+
+/**
+ * Helper to get and set the correct payload when transferring data using camel-mina.
+ * Always use this helper instead of direct access on the exchange object.
+ * <p/>
+ * This helper ensures that we can also transfer exchange objects over the wire using the
+ * <tt>transferExchange=true</tt> option.
+ *
+ * @version
+ */
+public final class Mina2PayloadHelper {
+
+ private Mina2PayloadHelper() {
+ //Utility Class
+ }
+
+ public static Object getIn(Mina2Endpoint endpoint, Exchange exchange) {
+ if (endpoint.getConfiguration().isTransferExchange()) {
+ // we should transfer the entire exchange over the wire (includes in/out)
+ return DefaultExchangeHolder.marshal(exchange);
+ } else {
+ // normal transfer using the body only
+ return exchange.getIn().getBody();
+ }
+ }
+
+ public static Object getOut(Mina2Endpoint endpoint, Exchange exchange) {
+ if (endpoint.getConfiguration().isTransferExchange()) {
+ // we should transfer the entire exchange over the wire (includes in/out)
+ return DefaultExchangeHolder.marshal(exchange);
+ } else {
+ // normal transfer using the body only
+ return exchange.getOut().getBody();
+ }
+ }
+
+ public static void setIn(Exchange exchange, Object payload) {
+ if (payload instanceof DefaultExchangeHolder) {
+ DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
+ } else {
+ // normal transfer using the body only
+ exchange.getIn().setBody(payload);
+ }
+ }
+
+ public static void setOut(Exchange exchange, Object payload) {
+ if (payload instanceof DefaultExchangeHolder) {
+ DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
+ } else {
+ // normal transfer using the body only and preserve the headers
+ exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+ exchange.getOut().setBody(payload);
+ }
+ }
+}
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2PayloadHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2PayloadHelper.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java?rev=1226449&view=auto
==============================================================================
--- camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java (added)
+++ camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java Mon Jan 2 15:59:58 2012
@@ -0,0 +1,510 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina2;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.ServicePoolAware;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.future.CloseFuture;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.service.IoConnector;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.service.IoService;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.session.IoSessionConfig;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
+import org.apache.mina.filter.codec.textline.LineDelimiter;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.transport.socket.nio.NioDatagramConnector;
+import org.apache.mina.transport.socket.nio.NioProcessor;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.mina.transport.vmpipe.VmPipeConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.camel.Producer} implementation for MINA
+ *
+ * @version
+ */
+public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(Mina2Producer.class);
+ private IoSession session;
+ private CountDownLatch latch;
+ private boolean lazySessionCreation;
+ private long timeout;
+ private SocketAddress address;
+ private IoConnector connector;
+ private boolean sync;
+ private CamelLogger noReplyLogger;
+ private Mina2Configuration configuration;
+ private IoSessionConfig connectorConfig;
+
+ public Mina2Producer(Mina2Endpoint endpoint) {
+ super(endpoint);
+ configuration = endpoint.getConfiguration();
+ this.lazySessionCreation = configuration.isLazySessionCreation();
+ this.timeout = configuration.getTimeout();
+ this.sync = configuration.isSync();
+ this.noReplyLogger = new CamelLogger(LOG, configuration.getNoReplyLogLevel());
+
+ String protocol = configuration.getProtocol();
+ if (protocol.equals("tcp")) {
+ createSocketEndpoint(protocol);
+ } else if (configuration.isDatagramProtocol()) {
+ createDatagramEndpoint(protocol);
+ } else if (protocol.equals("vm")) {
+ createVmEndpoint(protocol);
+ }
+ }
+
+ @Override
+ public Mina2Endpoint getEndpoint() {
+ return (Mina2Endpoint) super.getEndpoint();
+ }
+
+ @Override
+ public boolean isSingleton() {
+ // the producer should not be singleton otherwise cannot use concurrent producers and safely
+ // use request/reply with correct correlation
+ return false;
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ LOG.debug("Mina2Producer process");
+
+
+ if (session == null && !lazySessionCreation) {
+ throw new IllegalStateException("Not started yet!");
+ }
+ if (session == null || !session.isConnected()) {
+ openConnection();
+ }
+
+ // set the exchange encoding property
+ if (getEndpoint().getConfiguration().getCharsetName() != null) {
+ exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(getEndpoint().getConfiguration().getCharsetName()));
+ }
+
+ Object body = Mina2PayloadHelper.getIn(getEndpoint(), exchange);
+ if (body == null) {
+ noReplyLogger.log("No payload to send for exchange: " + exchange);
+ return; // exit early since nothing to write
+ }
+
+ // if textline enabled then covert to a String which must be used for textline
+ if (getEndpoint().getConfiguration().isTextline()) {
+ body = getEndpoint().getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
+ }
+
+ // if sync is true then we should also wait for a response (synchronous mode)
+ if (sync) {
+ // only initialize latch if we should get a response
+ latch = new CountDownLatch(1);
+ // reset handler if we expect a response
+ ResponseHandler handler = (ResponseHandler) session.getHandler();
+ handler.reset();
+ }
+
+ // log what we are writing
+ if (LOG.isDebugEnabled()) {
+ Object out = body;
+ if (body instanceof byte[]) {
+ // byte arrays is not readable so convert to string
+ out = exchange.getContext().getTypeConverter().convertTo(String.class, body);
+ }
+ LOG.debug("Writing body : {}", out);
+ }
+ // write the body
+ Mina2Helper.writeBody(session, body, exchange);
+
+ if (sync) {
+ // wait for response, consider timeout
+ LOG.debug("Waiting for response using timeout {} millis.", timeout);
+ boolean done = latch.await(timeout, TimeUnit.MILLISECONDS);
+ if (!done) {
+ throw new ExchangeTimedOutException(exchange, timeout);
+ }
+
+ // did we get a response
+ ResponseHandler handler = (ResponseHandler) session.getHandler();
+ if (handler.getCause() != null) {
+ throw new CamelExchangeException("Error occurred in ResponseHandler", exchange, handler.getCause());
+ } else if (!handler.isMessageReceived()) {
+ // no message received
+ throw new ExchangeTimedOutException(exchange, timeout);
+ } else {
+ // set the result on either IN or OUT on the original exchange depending on its pattern
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ Mina2PayloadHelper.setOut(exchange, handler.getMessage());
+ } else {
+ Mina2PayloadHelper.setIn(exchange, handler.getMessage());
+ }
+ }
+ }
+
+ // should session be closed after complete?
+ Boolean close;
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ close = exchange.getOut().getHeader(Mina2Constants.MINA_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class);
+ } else {
+ close = exchange.getIn().getHeader(Mina2Constants.MINA_CLOSE_SESSION_WHEN_COMPLETE, Boolean.class);
+ }
+
+ // should we disconnect, the header can override the configuration
+ boolean disconnect = getEndpoint().getConfiguration().isDisconnect();
+ if (close != null) {
+ disconnect = close;
+ }
+ if (disconnect) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing session when complete at address: {}", address);
+ }
+ session.close(true);
+ }
+ }
+
+ public DefaultIoFilterChainBuilder getFilterChain() {
+ return connector.getFilterChain();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ if (!lazySessionCreation) {
+ openConnection();
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping connector: {} at address: {}", connector, address);
+ }
+ closeConnection();
+ super.doStop();
+ }
+
+ private void closeConnection() {
+ if (session != null) {
+ CloseFuture closeFuture = session.close(true);
+ closeFuture.awaitUninterruptibly();
+ }
+
+ connector.dispose(true);
+ }
+
+ private void openConnection() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating connector to address: {} using connector: {} timeout: {} millis.", new Object[]{address, connector, timeout});
+ }
+ // connect and wait until the connection is established
+ if (connectorConfig != null) {
+ connector.getSessionConfig().setAll(connectorConfig);
+ }
+
+ connector.setHandler(new ResponseHandler(getEndpoint()));
+ ConnectFuture future = connector.connect(address);
+ future.awaitUninterruptibly();
+ session = future.getSession();
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+ protected void createVmEndpoint(String uri) {
+
+ boolean minaLogger = configuration.isMinaLogger();
+ List<IoFilter> filters = configuration.getFilters();
+
+ address = new VmPipeAddress(configuration.getPort());
+ connector = new VmPipeConnector();
+
+ // connector config
+ if (minaLogger) {
+ connector.getFilterChain().addLast("logger", new LoggingFilter());
+ }
+ appendIoFiltersToChain(filters, connector.getFilterChain());
+ configureCodecFactory("Mina2Producer", connector);
+
+ // set sync or async mode after endpoint is created
+ if (sync) {
+ this.getEndpoint().setExchangePattern(ExchangePattern.InOut);
+ } else {
+ this.getEndpoint().setExchangePattern(ExchangePattern.InOnly);
+ }
+ }
+
+ protected void createSocketEndpoint(String uri) {
+ LOG.debug("createSocketEndpoint");
+ boolean minaLogger = configuration.isMinaLogger();
+ long timeout = configuration.getTimeout();
+ List<IoFilter> filters = configuration.getFilters();
+
+ address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
+
+ connector = new NioSocketConnector(
+ new NioProcessor(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaSocketConnector")));
+
+ // connector config
+ connectorConfig = connector.getSessionConfig();
+ connector.getFilterChain().addLast("threadPool",
+ new ExecutorFilter(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+ if (minaLogger) {
+ connector.getFilterChain().addLast("logger", new LoggingFilter());
+ }
+ appendIoFiltersToChain(filters, connector.getFilterChain());
+ configureCodecFactory("Mina2Producer", connector);
+ // set connect timeout to mina in seconds
+ connector.setConnectTimeoutMillis(timeout);
+
+ // set sync or async mode after endpoint is created
+ if (sync) {
+ this.getEndpoint().setExchangePattern(ExchangePattern.InOut);
+ } else {
+ this.getEndpoint().setExchangePattern(ExchangePattern.InOnly);
+ }
+ }
+
+ protected void configureCodecFactory(String type, IoService service) {
+ LOG.debug("configureCodecFactory");
+ if (configuration.getCodec() != null) {
+ addCodecFactory(service, configuration.getCodec());
+ } else if (configuration.isAllowDefaultCodec()) {
+ configureDefaultCodecFactory(type, service);
+ }
+ }
+
+ protected void configureDefaultCodecFactory(String type, IoService service) {
+ LOG.debug("configureDefaultCodecFactory");
+ if (configuration.isTextline()) {
+ Charset charset = getEncodingParameter(type, configuration);
+ LineDelimiter delimiter = getLineDelimiterParameter(configuration.getTextlineDelimiter());
+ Mina2TextLineCodecFactory codecFactory = new Mina2TextLineCodecFactory(charset, delimiter);
+ if (configuration.getEncoderMaxLineLength() > 0) {
+ codecFactory.setEncoderMaxLineLength(configuration.getEncoderMaxLineLength());
+ }
+ if (configuration.getDecoderMaxLineLength() > 0) {
+ codecFactory.setDecoderMaxLineLength(configuration.getDecoderMaxLineLength());
+ }
+ addCodecFactory(service, codecFactory);
+ LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} line delimiter: {}({})",
+ new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter});
+ LOG.debug("Encoder maximum line length: {}. Decoder maximum line length: {}",
+ codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength());
+ } else {
+ ObjectSerializationCodecFactory codecFactory = new ObjectSerializationCodecFactory();
+ addCodecFactory(service, codecFactory);
+ LOG.debug("{}: Using ObjectSerializationCodecFactory: {}", type, codecFactory);
+ }
+ LOG.debug("configureDefaultCodecFactory exit");
+
+ }
+
+ protected void createDatagramEndpoint(String uri) {
+ boolean minaLogger = configuration.isMinaLogger();
+ boolean transferExchange = configuration.isTransferExchange();
+ List<IoFilter> filters = configuration.getFilters();
+
+ if (transferExchange) {
+ throw new IllegalArgumentException("transferExchange=true is not supported for datagram protocol");
+ }
+
+ address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
+ connector = new NioDatagramConnector(
+ new NioProcessor(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramConnector")));
+ connectorConfig = connector.getSessionConfig();
+ connector.getFilterChain().addLast("threadPool",
+ new ExecutorFilter(this.getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaThreadPool")));
+ if (minaLogger) {
+ connector.getFilterChain().addLast("logger", new LoggingFilter());
+ }
+ appendIoFiltersToChain(filters, connector.getFilterChain());
+ configureDataGramCodecFactory("Mina2Producer", connector, configuration);
+ // set connect timeout to mina in seconds
+ connector.setConnectTimeoutMillis(timeout);
+
+ // set sync or async mode after endpoint is created
+ if (sync) {
+ this.getEndpoint().setExchangePattern(ExchangePattern.InOut);
+ } else {
+ this.getEndpoint().setExchangePattern(ExchangePattern.InOnly);
+ }
+ }
+
+ /**
+ * For datagrams the entire message is available as a single IoBuffer so lets just pass those around by default
+ * and try converting whatever they payload is into IoBuffer unless some custom converter is specified
+ */
+ protected void configureDataGramCodecFactory(final String type, final IoService service, final Mina2Configuration configuration) {
+ ProtocolCodecFactory codecFactory = configuration.getCodec();
+ if (codecFactory == null) {
+ final Charset charset = getEncodingParameter(type, configuration);
+
+ codecFactory = new Mina2UdpProtocolCodecFactory(this.getEndpoint().getCamelContext(), charset);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Using CodecFactory: {} using encoding: {}", new Object[]{type, codecFactory, charset});
+ }
+ }
+
+ addCodecFactory(service, codecFactory);
+ }
+
+ private void addCodecFactory(IoService service, ProtocolCodecFactory codecFactory) {
+ LOG.debug("addCodecFactory name: {}", codecFactory.getClass().getName());
+
+ service.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));
+ }
+
+ private static LineDelimiter getLineDelimiterParameter(Mina2TextLineDelimiter delimiter) {
+ if (delimiter == null) {
+ return LineDelimiter.DEFAULT;
+ }
+
+ switch (delimiter) {
+ case DEFAULT:
+ return LineDelimiter.DEFAULT;
+ case AUTO:
+ return LineDelimiter.AUTO;
+ case UNIX:
+ return LineDelimiter.UNIX;
+ case WINDOWS:
+ return LineDelimiter.WINDOWS;
+ case MAC:
+ return LineDelimiter.MAC;
+ default:
+ throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter);
+ }
+ }
+
+ private Charset getEncodingParameter(String type, Mina2Configuration configuration) {
+ String encoding = configuration.getEncoding();
+ if (encoding == null) {
+ encoding = Charset.defaultCharset().name();
+ // set in on configuration so its updated
+ configuration.setEncoding(encoding);
+ LOG.debug("{}: No encoding parameter using default charset: {}", type, encoding);
+ }
+ if (!Charset.isSupported(encoding)) {
+ throw new IllegalArgumentException("The encoding: " + encoding + " is not supported");
+ }
+
+ return Charset.forName(encoding);
+ }
+
+ private void appendIoFiltersToChain(List<IoFilter> filters, DefaultIoFilterChainBuilder filterChain) {
+ if (filters != null && filters.size() > 0) {
+ for (IoFilter ioFilter : filters) {
+ filterChain.addLast(ioFilter.getClass().getCanonicalName(), ioFilter);
+ }
+ }
+ }
+
+ /**
+ * Handles response from session writes
+ */
+ private final class ResponseHandler extends IoHandlerAdapter {
+
+ private Mina2Endpoint endpoint;
+ private Object message;
+ private Throwable cause;
+ private boolean messageReceived;
+
+ private ResponseHandler(Mina2Endpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public void reset() {
+ this.message = null;
+ this.cause = null;
+ this.messageReceived = false;
+ }
+
+ @Override
+ public void messageReceived(IoSession ioSession, Object message) throws Exception {
+ LOG.debug("Message received: {}", message);
+ this.message = message;
+ messageReceived = true;
+ cause = null;
+ countDown();
+ }
+
+ protected void countDown() {
+ CountDownLatch downLatch = latch;
+ if (downLatch != null) {
+ downLatch.countDown();
+ }
+ }
+
+ @Override
+ public void sessionClosed(IoSession session) throws Exception {
+ if (sync && !messageReceived) {
+ // sync=true (InOut mode) so we expected a message as reply but did not get one before the session is closed
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Session closed but no message received from address: {}", address);
+ }
+ // session was closed but no message received. This could be because the remote server had an internal error
+ // and could not return a response. We should count down to stop waiting for a response
+ countDown();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(IoSession ioSession, Throwable cause) {
+ LOG.error("Exception on receiving message from address: " + address
+ + " using connector: " + connector, cause);
+ this.message = null;
+ this.messageReceived = false;
+ this.cause = cause;
+ if (ioSession != null) {
+ ioSession.close(true);
+ }
+ }
+
+ public Throwable getCause() {
+ return this.cause;
+ }
+
+ public Object getMessage() {
+ return this.message;
+ }
+
+ public boolean isMessageReceived() {
+ return messageReceived;
+ }
+ }
+}
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date