You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/02/07 17:45:18 UTC
[4/9] camel git commit: CAMEL-10756 Mina2 Producer "hang" until
timeout if the response message could not be decoded * Adapt exception caught
CAMEL-10756 Mina2 Producer "hang" until timeout if the response message
could not be decoded
* Adapt exception caught
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6adb9216
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6adb9216
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6adb9216
Branch: refs/heads/camel-2.18.x
Commit: 6adb92164a1f4ecc4b434498c0ce3b4827b26fef
Parents: 11ba337
Author: Thomas Papke <th...@icw.de>
Authored: Mon Feb 6 13:32:47 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Feb 7 18:41:30 2017 +0100
----------------------------------------------------------------------
.../camel/component/mina2/Mina2Producer.java | 8 ++--
.../component/mina2/Mina2CustomCodecTest.java | 49 ++++++++++++++++++++
2 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6adb9216/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
index fc7b4a2..7dd1795 100644
--- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
+++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Producer.java
@@ -111,6 +111,7 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
return false;
}
+ @Override
public void process(Exchange exchange) throws Exception {
try {
doProcess(exchange);
@@ -512,11 +513,8 @@ public class Mina2Producer extends DefaultProducer implements ServicePoolAware {
this.messageReceived = false;
this.cause = cause;
if (ioSession != null) {
- try {
- closeSessionIfNeededAndAwaitCloseInHandler(ioSession);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ CloseFuture closeFuture = ioSession.closeNow();
+ closeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6adb9216/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java
index 427a0ab..3a86190 100644
--- a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java
+++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2CustomCodecTest.java
@@ -16,6 +16,10 @@
*/
package org.apache.camel.component.mina2;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.camel.CamelExecutionException;
import org.apache.camel.ResolveEndpointFailedException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -48,9 +52,29 @@ public class Mina2CustomCodecTest extends BaseMina2Test {
}
@Test
+ public void testProducerFailInDecodingResponse() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
+ try {
+ template.requestBody(String.format("mina2:tcp://localhost:%1$s?sync=true&codec=#failingCodec", getPort()), "Hello World");
+ fail("Expecting that decode of result fails");
+ } catch (Exception e){
+ assertTrue(e instanceof CamelExecutionException);
+ Optional<Throwable> rootCause = Stream.iterate(e, Throwable::getCause)
+ .filter(element -> element.getCause() == null).findFirst();
+ assertTrue(rootCause.isPresent());
+ assertTrue(rootCause.get() instanceof IllegalArgumentException);
+ assertTrue(rootCause.get().getMessage().contains("Something went wrong in decode"));
+ }
+
+ }
+
+ @Test
public void testTCPEncodeUTF8InputIsString() throws Exception {
final String myUri = String.format("mina2:tcp://localhost:%1$s?encoding=UTF-8&sync=false", getNextPort());
context.addRoutes(new RouteBuilder() {
+ @Override
public void configure() {
from(myUri).to("mock:result");
}
@@ -78,15 +102,19 @@ public class Mina2CustomCodecTest extends BaseMina2Test {
}
}
+ @Override
protected JndiRegistry createRegistry() throws Exception {
JndiRegistry jndi = super.createRegistry();
jndi.bind("myCodec", new MyCodec());
+ jndi.bind("failingCodec", new MyCodec(true));
return jndi;
}
+ @Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
+ @Override
public void configure() throws Exception {
from(String.format("mina2:tcp://localhost:%1$s?sync=true&codec=#myCodec", getPort())).transform(constant("Bye World")).to("mock:result");
}
@@ -95,9 +123,22 @@ public class Mina2CustomCodecTest extends BaseMina2Test {
private static class MyCodec implements ProtocolCodecFactory {
+ private final boolean failing;
+
+ public MyCodec(boolean failing) {
+ this.failing = failing;
+
+ }
+
+ public MyCodec() {
+ this.failing = false;
+ }
+
+ @Override
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return new ProtocolEncoder() {
+ @Override
public void encode(IoSession ioSession, Object message, ProtocolEncoderOutput out)
throws Exception {
IoBuffer bb = IoBuffer.allocate(32).setAutoExpand(true);
@@ -107,6 +148,7 @@ public class Mina2CustomCodecTest extends BaseMina2Test {
out.write(bb);
}
+ @Override
public void dispose(IoSession ioSession) throws Exception {
// do nothing
}
@@ -114,10 +156,17 @@ public class Mina2CustomCodecTest extends BaseMina2Test {
}
+ @Override
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return new CumulativeProtocolDecoder() {
+ @Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
+ if (failing){
+ throw new IllegalArgumentException("Something went wrong in decode");
+ }
+
+
if (in.remaining() > 0) {
byte[] buf = new byte[in.remaining()];
in.get(buf);