You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/01/14 15:53:59 UTC
[1/3] camel git commit: Cleaned-up logging;
Moved final ServerSocket close() to a finally block
Repository: camel
Updated Branches:
refs/heads/master 15e72ee2a -> bfb0a8f0f
Cleaned-up logging; Moved final ServerSocket close() to a finally block
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3a362586
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3a362586
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3a362586
Branch: refs/heads/master
Commit: 3a3625863ab569b2ef051f231ca7975dd7ad445e
Parents: 5b0fbd3
Author: Quinn Stevenson <qu...@pronoia-solutions.com>
Authored: Tue Jan 12 09:06:00 2016 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jan 14 14:22:39 2016 +0000
----------------------------------------------------------------------
.../component/mllp/MllpTcpServerConsumer.java | 168 ++++++++++---------
1 file changed, 85 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3a362586/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index 33568da..05d13f5 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -194,107 +194,109 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
public void run() {
log.debug("Starting acceptor thread");
- while (null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) {
- /* ? set this here ? */
- // serverSocket.setSoTimeout( 10000 );
+ try {
+ while (!isInterrupted() && null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) {
// TODO: Need to check maxConnections and figure out what to do when exceeded
- Socket socket = null;
- try {
- socket = serverSocket.accept();
- } catch (SocketException socketEx) {
- // This should happen if the component is closed while the accept call is blocking
- if (serverSocket.isBound()) {
- try {
- serverSocket.close();
- } catch (Exception ex) {
- log.info("Exception encountered closing ServerSocket after SocketException on accept()", ex);
+ Socket socket = null;
+ try {
+ socket = serverSocket.accept();
+ } catch (SocketException socketEx) {
+ // This should happen if the component is closed while the accept call is blocking
+ if (serverSocket.isBound()) {
+ try {
+ serverSocket.close();
+ } catch (Exception ex) {
+ log.debug("Exception encountered closing ServerSocket after SocketException on accept() - ignoring", ex);
+ }
}
- }
- } catch (IOException ioEx) {
- log.error("Exception encountered accepting connection - closing ServerSocket", ioEx);
- if (serverSocket.isBound()) {
- try {
- serverSocket.close();
- } catch (Exception ex) {
- log.info("Exception encountered closing ServerSocket after exception on accept()", ex);
+ continue;
+ } catch (IOException ioEx) {
+ log.error("Exception encountered accepting connection - closing ServerSocket", ioEx);
+ if (serverSocket.isBound()) {
+ try {
+ serverSocket.close();
+ } catch (Exception ex) {
+ log.debug("Exception encountered closing ServerSocket after exception on accept() - ignoring", ex);
+ }
}
+ continue;
}
- }
- try {
+ try {
/* Wait a bit and then check and see if the socket is really there - it could be a load balancer
pinging the port
*/
- Thread.sleep(100);
- if (socket.isConnected() && !socket.isClosed()) {
- log.debug("Socket appears to be there - check for available data");
- InputStream inputStream;
- try {
- inputStream = socket.getInputStream();
- } catch (IOException ioEx) {
- // Bad Socket -
- log.warn("Failed to retrieve the InputStream for socket after the initial connection was accepted");
- MllpUtil.resetConnection(socket);
- continue;
- }
+ Thread.sleep(100);
+ if (socket.isConnected() && !socket.isClosed()) {
+ log.debug("Socket appears to be there - check for available data");
+ InputStream inputStream;
+ try {
+ inputStream = socket.getInputStream();
+ } catch (IOException ioEx) {
+ // Bad Socket -
+ log.warn("Failed to retrieve the InputStream for socket after the initial connection was accepted");
+ MllpUtil.resetConnection(socket);
+ continue;
+ }
- if (0 < inputStream.available()) {
- // Something is there - start the client thread
- ClientSocketThread clientThread = new ClientSocketThread(socket, null);
- clientThreads.add(clientThread);
- clientThread.start();
- continue;
- }
+ if (0 < inputStream.available()) {
+ // Something is there - start the client thread
+ ClientSocketThread clientThread = new ClientSocketThread(socket, null);
+ clientThreads.add(clientThread);
+ clientThread.start();
+ continue;
+ }
- // The easy check failed - so trigger a blocking read
- socket.setSoTimeout(100);
- try {
- int tmpByte = inputStream.read();
- socket.setSoTimeout(endpoint.receiveTimeout);
- if (-1 == tmpByte) {
- log.debug("Socket.read() returned END_OF_STREAM - resetting connection");
- MllpUtil.resetConnection(socket);
- } else {
- ClientSocketThread clientThread = new ClientSocketThread(socket, tmpByte);
+ // The easy check failed - so trigger a blocking read
+ socket.setSoTimeout(100);
+ try {
+ int tmpByte = inputStream.read();
+ socket.setSoTimeout(endpoint.receiveTimeout);
+ if (-1 == tmpByte) {
+ log.debug("Socket.read() returned END_OF_STREAM - resetting connection");
+ MllpUtil.resetConnection(socket);
+ } else {
+ ClientSocketThread clientThread = new ClientSocketThread(socket, tmpByte);
+ clientThreads.add(clientThread);
+ clientThread.start();
+ }
+ } catch (SocketTimeoutException timeoutEx) {
+ // No data, but the socket is there
+ log.debug("No Data - but the socket is there. Starting ClientSocketThread");
+ ClientSocketThread clientThread = new ClientSocketThread(socket, null);
clientThreads.add(clientThread);
clientThread.start();
}
- } catch (SocketTimeoutException timeoutEx) {
- // No data, but the socket is there
- log.debug("No Data - but the socket is there. Starting ClientSocketThread");
- ClientSocketThread clientThread = new ClientSocketThread(socket, null);
- clientThreads.add(clientThread);
- clientThread.start();
}
- }
- } catch (SocketTimeoutException timeoutEx) {
- // No new clients
- log.trace("SocketTimeoutException waiting for new connections - no new connections");
-
- for (int i = clientThreads.size() - 1; i >= 0; --i) {
- ClientSocketThread thread = clientThreads.get(i);
- if (!thread.isAlive()) {
- clientThreads.remove(i);
+ } catch (SocketTimeoutException timeoutEx) {
+ // No new clients
+ log.trace("SocketTimeoutException waiting for new connections - no new connections");
+
+ for (int i = clientThreads.size() - 1; i >= 0; --i) {
+ ClientSocketThread thread = clientThreads.get(i);
+ if (!thread.isAlive()) {
+ clientThreads.remove(i);
+ }
}
+ } catch (InterruptedException interruptEx) {
+ log.debug("accept loop interrupted - closing ServerSocket");
+ try {
+ serverSocket.close();
+ } catch (Exception ex) {
+ log.debug("Exception encountered closing ServerSocket after InterruptedException - ignoring", ex);
+ }
+ } catch (Exception ex) {
+ log.error("Exception accepting new connection - retrying", ex);
}
- } catch (InterruptedException interruptEx) {
- log.info("accept loop interrupted - closing ServerSocket");
+ }
+ } finally {
+ log.debug("ServerSocket.accept loop finished - closing listener");
+ if (null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) {
try {
serverSocket.close();
} catch (Exception ex) {
- log.warn("Exception encountered closing ServerSocket after InterruptedException", ex);
+ log.debug("Exception encountered closing ServerSocket after accept loop had exited - ignoring", ex);
}
- } catch (Exception ex) {
- log.error("Exception accepting new connection", ex);
- }
- }
-
- log.info("ServerSocket.accept loop finished - closing listener");
- if (null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) {
- try {
- serverSocket.close();
- } catch (Exception ex) {
- log.warn("Exception encountered closing ServerSocket after accept loop had exited", ex);
}
}
}
@@ -306,8 +308,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
if (serverSocket.isBound()) {
try {
serverSocket.close();
- } catch (IOException ioEx ) {
- log.warn("Exception encountered closing ServerSocket in interrupt() method", ioEx);
+ } catch (IOException ioEx) {
+ log.warn("Exception encountered closing ServerSocket in interrupt() method - ignoring", ioEx);
}
}
}
[3/3] camel git commit: Added simple test as example
Posted by da...@apache.org.
Added simple test as example
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bfb0a8f0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bfb0a8f0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bfb0a8f0
Branch: refs/heads/master
Commit: bfb0a8f0f0e53f2d65f1d00ef6838e741ba12005
Parents: 3a36258
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jan 13 16:57:28 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jan 14 14:22:40 2016 +0000
----------------------------------------------------------------------
.../camel/test/patterns/SimpleMockTest.java | 40 ++++++++++++++
.../camel/test/patterns/SimpleMockTest.xml | 31 +++++++++++
.../test/patterns/SimpleNotifyBuilderTest.java | 57 ++++++++++++++++++++
3 files changed, 128 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb0a8f0/components/camel-test-spring/src/test/java/org/apache/camel/test/patterns/SimpleMockTest.java
----------------------------------------------------------------------
diff --git a/components/camel-test-spring/src/test/java/org/apache/camel/test/patterns/SimpleMockTest.java b/components/camel-test-spring/src/test/java/org/apache/camel/test/patterns/SimpleMockTest.java
new file mode 100644
index 0000000..c1d82cf
--- /dev/null
+++ b/components/camel-test-spring/src/test/java/org/apache/camel/test/patterns/SimpleMockTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.patterns;
+
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class SimpleMockTest extends CamelSpringTestSupport {
+
+ @Override
+ protected AbstractApplicationContext createApplicationContext() {
+ return new ClassPathXmlApplicationContext("org/apache/camel/test/patterns/SimpleMockTest.xml");
+ }
+
+ @Test
+ public void testMock() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb0a8f0/components/camel-test-spring/src/test/resources/org/apache/camel/test/patterns/SimpleMockTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-test-spring/src/test/resources/org/apache/camel/test/patterns/SimpleMockTest.xml b/components/camel-test-spring/src/test/resources/org/apache/camel/test/patterns/SimpleMockTest.xml
new file mode 100644
index 0000000..b3f28a3
--- /dev/null
+++ b/components/camel-test-spring/src/test/resources/org/apache/camel/test/patterns/SimpleMockTest.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd ">
+
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+ <route>
+ <from uri="direct:start"/>
+ <to uri="mock:result"/>
+ </route>
+ </camelContext>
+
+</beans>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/bfb0a8f0/components/camel-test/src/test/java/org/apache/camel/test/patterns/SimpleNotifyBuilderTest.java
----------------------------------------------------------------------
diff --git a/components/camel-test/src/test/java/org/apache/camel/test/patterns/SimpleNotifyBuilderTest.java b/components/camel-test/src/test/java/org/apache/camel/test/patterns/SimpleNotifyBuilderTest.java
new file mode 100644
index 0000000..bb76dc9
--- /dev/null
+++ b/components/camel-test/src/test/java/org/apache/camel/test/patterns/SimpleNotifyBuilderTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.patterns;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class SimpleNotifyBuilderTest extends CamelTestSupport {
+
+ @Test
+ public void testNotifyBuilder() throws Exception {
+ NotifyBuilder notify = new NotifyBuilder(context)
+ .from("seda:start")
+ .wereSentTo("seda:queue")
+ .whenDone(10)
+ .create();
+
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("seda:start", "Camel" + i);
+ }
+
+ boolean matches = notify.matches(10, TimeUnit.SECONDS);
+ assertTrue(matches);
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:start")
+ .transform(simple("Bye ${body}"))
+ .to("seda:queue");
+ }
+ };
+ }
+
+}
[2/3] camel git commit: Improve shutdown of listener for consumer
Posted by da...@apache.org.
Improve shutdown of listener for consumer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5b0fbd34
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5b0fbd34
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5b0fbd34
Branch: refs/heads/master
Commit: 5b0fbd344a1f4fff3c848691a38d4faf6029e322
Parents: 15e72ee
Author: Quinn Stevenson <qu...@pronoia-solutions.com>
Authored: Mon Jan 11 14:15:10 2016 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jan 14 14:22:39 2016 +0000
----------------------------------------------------------------------
.../component/mllp/MllpTcpServerConsumer.java | 48 +++++++++++++++++++-
1 file changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5b0fbd34/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index 186a462..33568da 100644
--- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -21,6 +21,7 @@ import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -194,12 +195,33 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
log.debug("Starting acceptor thread");
while (null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) {
- try {
/* ? set this here ? */
// serverSocket.setSoTimeout( 10000 );
// TODO: Need to check maxConnections and figure out what to do when exceeded
- Socket socket = serverSocket.accept();
+ Socket socket = null;
+ try {
+ socket = serverSocket.accept();
+ } catch (SocketException socketEx) {
+ // This should happen if the component is closed while the accept call is blocking
+ if (serverSocket.isBound()) {
+ try {
+ serverSocket.close();
+ } catch (Exception ex) {
+ log.info("Exception encountered closing ServerSocket after SocketException on accept()", ex);
+ }
+ }
+ } catch (IOException ioEx) {
+ log.error("Exception encountered accepting connection - closing ServerSocket", ioEx);
+ if (serverSocket.isBound()) {
+ try {
+ serverSocket.close();
+ } catch (Exception ex) {
+ log.info("Exception encountered closing ServerSocket after exception on accept()", ex);
+ }
+ }
+ }
+ try {
/* Wait a bit and then check and see if the socket is really there - it could be a load balancer
pinging the port
*/
@@ -266,8 +288,30 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
log.error("Exception accepting new connection", ex);
}
}
+
+ log.info("ServerSocket.accept loop finished - closing listener");
+ if (null != serverSocket && serverSocket.isBound() && !serverSocket.isClosed()) {
+ try {
+ serverSocket.close();
+ } catch (Exception ex) {
+ log.warn("Exception encountered closing ServerSocket after accept loop had exited", ex);
+ }
+ }
}
+ @Override
+ public void interrupt() {
+ super.interrupt();
+ if (null != serverSocket) {
+ if (serverSocket.isBound()) {
+ try {
+ serverSocket.close();
+ } catch (IOException ioEx ) {
+ log.warn("Exception encountered closing ServerSocket in interrupt() method", ioEx);
+ }
+ }
+ }
+ }
}
class ClientSocketThread extends Thread {