You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by an...@apache.org on 2016/06/22 12:35:03 UTC
[1/3] camel git commit: CAMEL-10024: sync on close and deprecation
Repository: camel
Updated Branches:
refs/heads/master 8f2bc1514 -> 76544116f
CAMEL-10024: sync on close and deprecation
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fe41b1bb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fe41b1bb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fe41b1bb
Branch: refs/heads/master
Commit: fe41b1bb9ac743214a6f6baba393a251e55037fc
Parents: e784761
Author: Arno Noordover <an...@users.noreply.github.com>
Authored: Fri Jun 17 16:47:14 2016 +0200
Committer: Arno Noordover <an...@users.noreply.github.com>
Committed: Fri Jun 17 16:47:14 2016 +0200
----------------------------------------------------------------------
.../camel/component/mina2/Mina2Consumer.java | 6 +-
.../camel/component/mina2/Mina2Producer.java | 36 +++++++---
...Mina2ClientModeTcpTextlineDelimiterTest.java | 2 +-
.../mina2/Mina2DisconnectRaceConditionTest.java | 70 ++++++++++++++++++++
.../component/mina2/Mina2EncodingTest.java | 2 +-
.../mina2/Mina2ExchangeTimeOutTest.java | 2 +-
.../mina2/Mina2NoResponseFromServerTest.java | 4 +-
.../mina2/Mina2ProducerShutdownMockTest.java | 7 +-
.../mina2/Mina2ReverseProtocolHandler.java | 2 +-
.../mina2/Mina2TransferExchangeOptionTest.java | 2 +-
10 files changed, 109 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
index c2a7c50..a916bab 100644
--- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
+++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
@@ -115,7 +115,7 @@ public class Mina2Consumer extends DefaultConsumer {
if (configuration.isClientMode() && configuration.getProtocol().equals("tcp")) {
LOG.info("Disconnect from server address: {} using connector: {}", address, connector);
if (session != null) {
- CloseFuture closeFuture = session.close(true);
+ CloseFuture closeFuture = session.closeNow();
closeFuture.awaitUninterruptibly();
}
connector.dispose(true);
@@ -382,7 +382,7 @@ public class Mina2Consumer extends DefaultConsumer {
// close invalid session
if (session != null) {
LOG.warn("Closing session as an exception was thrown from MINA");
- session.close(true);
+ session.closeNow();
}
// must wrap and rethrow since cause can be of Throwable and we must only throw Exception
@@ -456,7 +456,7 @@ public class Mina2Consumer extends DefaultConsumer {
}
if (disconnect) {
LOG.debug("Closing session when complete at address: {}", address);
- session.close(true);
+ session.closeNow();
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
index fdd02dc..4337075 100644
--- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
+++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
@@ -65,6 +65,7 @@ import org.slf4j.LoggerFactory;
public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
private static final Logger LOG = LoggerFactory.getLogger(Mina2Producer.class);
+ private final ResponseHandler handler;
private IoSession session;
private CountDownLatch latch;
private boolean lazySessionCreation;
@@ -76,6 +77,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
private Mina2Configuration configuration;
private IoSessionConfig connectorConfig;
private ExecutorService workerPool;
+ private CountDownLatch closeLatch;
public Mina2Producer(Mina2Endpoint endpoint) throws Exception {
super(endpoint);
@@ -93,6 +95,8 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
} else if (protocol.equals("vm")) {
setupVmProtocol(protocol);
}
+ handler = new ResponseHandler();
+ connector.setHandler(handler);
}
@Override
@@ -146,7 +150,6 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
// only initialize latch if we should get a response
latch = new CountDownLatch(1);
// reset handler if we expect a response
- ResponseHandler handler = (ResponseHandler) session.getHandler();
handler.reset();
}
@@ -171,7 +174,6 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
}
// did we get a response
- ResponseHandler handler = (ResponseHandler) session.getHandler();
if (handler.getCause() != null) {
throw new CamelExchangeException("Error occurred in ResponseHandler", exchange, handler.getCause());
} else if (!handler.isMessageReceived()) {
@@ -188,7 +190,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
}
}
- protected void maybeDisconnectOnDone(Exchange exchange) {
+ protected void maybeDisconnectOnDone(Exchange exchange) throws InterruptedException {
if (session == null) {
return;
}
@@ -208,7 +210,16 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
}
if (disconnect) {
LOG.debug("Closing session when complete at address: {}", address);
- session.close(true);
+ closeSessionIfNeededAndAwaitCloseInHandler(session);
+ }
+ }
+
+ private void closeSessionIfNeededAndAwaitCloseInHandler(IoSession sessionToBeClosed) throws InterruptedException {
+ closeLatch = new CountDownLatch(1);
+ if (!sessionToBeClosed.isClosing()) {
+ CloseFuture closeFuture = sessionToBeClosed.closeNow();
+ closeFuture.await(timeout, TimeUnit.MILLISECONDS);
+ closeLatch.await(timeout, TimeUnit.MILLISECONDS);
}
}
@@ -241,10 +252,9 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
super.doShutdown();
}
- private void closeConnection() {
+ private void closeConnection() throws InterruptedException {
if (session != null) {
- CloseFuture closeFuture = session.close(true);
- closeFuture.awaitUninterruptibly();
+ closeSessionIfNeededAndAwaitCloseInHandler(session);
}
connector.dispose(true);
@@ -255,14 +265,13 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
setSocketAddress(this.configuration.getProtocol());
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Creating connector to address: {} using connector: {} timeout: {} millis.", new Object[]{address, connector, timeout});
+ LOG.debug("Creating connector to address: {} using connector: {} timeout: {} millis.", address, connector, timeout);
}
// connect and wait until the connection is established
if (connectorConfig != null) {
connector.getSessionConfig().setAll(connectorConfig);
}
- connector.setHandler(new ResponseHandler());
ConnectFuture future = connector.connect(address);
future.awaitUninterruptibly();
session = future.getSession();
@@ -342,7 +351,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
}
addCodecFactory(service, codecFactory);
LOG.debug("{}: Using TextLineCodecFactory: {} using encoding: {} line delimiter: {}({})",
- new Object[]{type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter});
+ type, codecFactory, charset, configuration.getTextlineDelimiter(), delimiter);
LOG.debug("Encoder maximum line length: {}. Decoder maximum line length: {}",
codecFactory.getEncoderMaxLineLength(), codecFactory.getDecoderMaxLineLength());
} else {
@@ -502,6 +511,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
// and could not return a response. We should count down to stop waiting for a response
countDown();
}
+ closeLatch.countDown();
}
@Override
@@ -512,7 +522,11 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
this.messageReceived = false;
this.cause = cause;
if (ioSession != null) {
- ioSession.close(true);
+ try {
+ closeSessionIfNeededAndAwaitCloseInHandler(ioSession);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
index ba11d11..3f7bb47 100644
--- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
+++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
@@ -86,7 +86,7 @@ public class Mina2ClientModeTcpTextlineDelimiterTest extends BaseMina2Test {
private class ServerHandler extends IoHandlerAdapter {
public void sessionOpened(IoSession session) throws Exception {
session.write("Hello there!\n");
- session.close(true);
+ session.closeNow();
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java
new file mode 100644
index 0000000..6b4b353
--- /dev/null
+++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2DisconnectRaceConditionTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina2;
+
+import java.lang.reflect.Field;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.mina.core.session.IoSession;
+import org.junit.Test;
+
+public class Mina2DisconnectRaceConditionTest extends BaseMina2Test {
+
+ /**
+ * This is a test for issue CAMEL-10024 - the closing must complete before we return from the producer
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCloseSessionWhenCompleteManyTimes() throws Exception {
+ final String endpointUri = String.format("mina2:tcp://localhost:%1$s?sync=true&textline=true&disconnect=true&minaLogger=true", getPort());
+ Mina2Producer producer = (Mina2Producer) context.getEndpoint(endpointUri).createProducer();
+ // Access session to check that the session is really closed
+ Field field = producer.getClass().getDeclaredField("session");
+ field.setAccessible(true);
+
+ for (int i = 0; i < 100; i++) {
+ Exchange e = new DefaultExchange(context, ExchangePattern.InOut);
+ e.getIn().setBody("Chad");
+ producer.process(e);
+ final IoSession ioSession = (IoSession) field.get(producer);
+ assertTrue(ioSession.getCloseFuture().isDone());
+ Object out = e.getOut().getBody();
+ assertEquals("Bye Chad", out);
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ public void configure() throws Exception {
+ from(String.format("mina2:tcp://localhost:%1$s?sync=true&textline=true", getPort())).process(new Processor() {
+
+ public void process(Exchange exchange) throws Exception {
+ String body = exchange.getIn().getBody(String.class);
+ exchange.getOut().setBody("Bye " + body);
+ }
+ });
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java
index 63328d5..17a120f 100644
--- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java
+++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2EncodingTest.java
@@ -171,7 +171,7 @@ public class Mina2EncodingTest extends BaseMina2Test {
Endpoint endpoint = context.getEndpoint(uri);
Producer producer = endpoint.createProducer();
- Exchange exchange = producer.createExchange();
+ Exchange exchange = endpoint.createExchange();
exchange.getIn().setBody(hello);
producer.start();
http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java
index 512c1f6..d24743f 100644
--- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java
+++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ExchangeTimeOutTest.java
@@ -35,7 +35,7 @@ public class Mina2ExchangeTimeOutTest extends BaseMina2Test {
Endpoint endpoint = context.getEndpoint(String.format("mina2:tcp://localhost:%1$s?textline=true&sync=true&timeout=500", getPort()));
Producer producer = endpoint.createProducer();
producer.start();
- Exchange exchange = producer.createExchange();
+ Exchange exchange = endpoint.createExchange();
exchange.getIn().setBody("Hello World");
try {
producer.process(exchange);
http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java
index 25b82dd..7870efc 100644
--- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java
+++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2NoResponseFromServerTest.java
@@ -73,7 +73,7 @@ public class Mina2NoResponseFromServerTest extends BaseMina2Test {
public void encode(IoSession ioSession, Object message, ProtocolEncoderOutput out)
throws Exception {
// close session instead of returning a reply
- ioSession.close(true);
+ ioSession.closeNow();
}
public void dispose(IoSession ioSession) throws Exception {
@@ -89,7 +89,7 @@ public class Mina2NoResponseFromServerTest extends BaseMina2Test {
public void decode(IoSession ioSession, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
// close session instead of returning a reply
- ioSession.close(true);
+ ioSession.closeNow();
}
public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput)
http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java
index c36bade..ab110a0 100644
--- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java
+++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ProducerShutdownMockTest.java
@@ -27,9 +27,10 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.mina.transport.socket.SocketConnector;
import org.junit.Test;
-import static org.easymock.classextension.EasyMock.createMock;
-import static org.easymock.classextension.EasyMock.replay;
-import static org.easymock.classextension.EasyMock.verify;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
/**
* Unit testing for using a MinaProducer that it can shutdown properly (CAMEL-395)
http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java
index 04579fa..6267dc0 100644
--- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java
+++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ReverseProtocolHandler.java
@@ -30,7 +30,7 @@ public class Mina2ReverseProtocolHandler extends IoHandlerAdapter {
public void exceptionCaught(IoSession session, Throwable cause) {
cause.printStackTrace();
// Close connection when unexpected exception is caught.
- session.close(true);
+ session.closeNow();
}
public void messageReceived(IoSession session, Object message) {
http://git-wip-us.apache.org/repos/asf/camel/blob/fe41b1bb/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java
index 282d471..74b5683 100644
--- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java
+++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2TransferExchangeOptionTest.java
@@ -53,7 +53,7 @@ public class Mina2TransferExchangeOptionTest extends BaseMina2Test {
private Exchange sendExchange(boolean setException) throws Exception {
Endpoint endpoint = context.getEndpoint(String.format("mina2:tcp://localhost:%1$s?sync=true&encoding=UTF-8&transferExchange=true", getPort()));
Producer producer = endpoint.createProducer();
- Exchange exchange = producer.createExchange();
+ Exchange exchange = endpoint.createExchange();
//Exchange exchange = endpoint.createExchange();
Message message = exchange.getIn();
[3/3] camel git commit: CAMEL-10024: sync on close and deprecation
This closes #1043
Posted by an...@apache.org.
CAMEL-10024: sync on close and deprecation
This closes #1043
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/76544116
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/76544116
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/76544116
Branch: refs/heads/master
Commit: 76544116f587440bfa4df684242bbde895b99ce5
Parents: 8f2bc15 4d14c57
Author: Arno Noordover <an...@users.noreply.github.com>
Authored: Wed Jun 22 14:33:38 2016 +0200
Committer: Arno Noordover <an...@users.noreply.github.com>
Committed: Wed Jun 22 14:33:38 2016 +0200
----------------------------------------------------------------------
.../camel/component/mina2/Mina2Consumer.java | 6 +-
.../camel/component/mina2/Mina2Producer.java | 74 +++++++++++---------
.../component/mina2/Mina2TextLineDelimiter.java | 18 ++++-
...Mina2ClientModeTcpTextlineDelimiterTest.java | 2 +-
.../mina2/Mina2DisconnectRaceConditionTest.java | 70 ++++++++++++++++++
.../component/mina2/Mina2EncodingTest.java | 2 +-
.../mina2/Mina2ExchangeTimeOutTest.java | 2 +-
.../mina2/Mina2NoResponseFromServerTest.java | 4 +-
.../mina2/Mina2ProducerShutdownMockTest.java | 7 +-
.../mina2/Mina2ReverseProtocolHandler.java | 2 +-
.../mina2/Mina2TransferExchangeOptionTest.java | 2 +-
11 files changed, 141 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
[2/3] camel git commit: CAMEL-10024: Renamed latch and null-safety
Posted by an...@apache.org.
CAMEL-10024: Renamed latch and null-safety
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4d14c575
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4d14c575
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4d14c575
Branch: refs/heads/master
Commit: 4d14c575057f5f4610605f2cda0254c5e8c71885
Parents: fe41b1b
Author: Arno Noordover <an...@users.noreply.github.com>
Authored: Sat Jun 18 17:11:31 2016 +0200
Committer: Arno Noordover <an...@users.noreply.github.com>
Committed: Sat Jun 18 17:11:31 2016 +0200
----------------------------------------------------------------------
.../camel/component/mina2/Mina2Producer.java | 42 ++++++++------------
.../component/mina2/Mina2TextLineDelimiter.java | 18 ++++++++-
2 files changed, 34 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/4d14c575/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
index 4337075..2ac6e92 100644
--- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
+++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
@@ -67,7 +67,8 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
private static final Logger LOG = LoggerFactory.getLogger(Mina2Producer.class);
private final ResponseHandler handler;
private IoSession session;
- private CountDownLatch latch;
+ private CountDownLatch responseLatch;
+ private CountDownLatch closeLatch;
private boolean lazySessionCreation;
private long timeout;
private SocketAddress address;
@@ -77,7 +78,6 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
private Mina2Configuration configuration;
private IoSessionConfig connectorConfig;
private ExecutorService workerPool;
- private CountDownLatch closeLatch;
public Mina2Producer(Mina2Endpoint endpoint) throws Exception {
super(endpoint);
@@ -147,8 +147,8 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
// if sync is true then we should also wait for a response (synchronous mode)
if (sync) {
- // only initialize latch if we should get a response
- latch = new CountDownLatch(1);
+ // only initialize responseLatch if we should get a response
+ responseLatch = new CountDownLatch(1);
// reset handler if we expect a response
handler.reset();
}
@@ -168,7 +168,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
if (sync) {
// wait for response, consider timeout
LOG.debug("Waiting for response using timeout {} millis.", timeout);
- boolean done = latch.await(timeout, TimeUnit.MILLISECONDS);
+ boolean done = responseLatch.await(timeout, TimeUnit.MILLISECONDS);
if (!done) {
throw new ExchangeTimedOutException(exchange, timeout);
}
@@ -421,21 +421,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
if (delimiter == null) {
return LineDelimiter.DEFAULT;
}
-
- switch (delimiter) {
- case DEFAULT:
- return LineDelimiter.DEFAULT;
- case AUTO:
- return LineDelimiter.AUTO;
- case UNIX:
- return LineDelimiter.UNIX;
- case WINDOWS:
- return LineDelimiter.WINDOWS;
- case MAC:
- return LineDelimiter.MAC;
- default:
- throw new IllegalArgumentException("Unknown textline delimiter: " + delimiter);
- }
+ return delimiter.getLineDelimiter();
}
private Charset getEncodingParameter(String type, Mina2Configuration configuration) {
@@ -492,11 +478,11 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
this.message = message;
messageReceived = true;
cause = null;
- countDown();
+ notifyResultAvailable();
}
- protected void countDown() {
- CountDownLatch downLatch = latch;
+ protected void notifyResultAvailable() {
+ CountDownLatch downLatch = responseLatch;
if (downLatch != null) {
downLatch.countDown();
}
@@ -509,9 +495,15 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
LOG.debug("Session closed but no message received from address: {}", address);
// session was closed but no message received. This could be because the remote server had an internal error
// and could not return a response. We should count down to stop waiting for a response
- countDown();
+ notifyResultAvailable();
+ }
+ notifySessionClosed();
+ }
+
+ private void notifySessionClosed() {
+ if (closeLatch != null) {
+ closeLatch.countDown();
}
- closeLatch.countDown();
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/4d14c575/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2TextLineDelimiter.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2TextLineDelimiter.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2TextLineDelimiter.java
index 8bf87c7..bc83a7e 100644
--- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2TextLineDelimiter.java
+++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2TextLineDelimiter.java
@@ -16,10 +16,26 @@
*/
package org.apache.camel.component.mina2;
+import org.apache.mina.filter.codec.textline.LineDelimiter;
+
/**
* Possible text line delimiters to be used with the textline codec.
*/
public enum Mina2TextLineDelimiter {
- DEFAULT, AUTO, UNIX, WINDOWS, MAC
+ DEFAULT(LineDelimiter.DEFAULT),
+ AUTO(LineDelimiter.AUTO),
+ UNIX(LineDelimiter.UNIX),
+ WINDOWS(LineDelimiter.WINDOWS),
+ MAC(LineDelimiter.MAC);
+
+ private final LineDelimiter lineDelimiter;
+
+ Mina2TextLineDelimiter(LineDelimiter lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ }
+
+ public LineDelimiter getLineDelimiter() {
+ return lineDelimiter;
+ }
}