You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2012/05/18 02:42:56 UTC
svn commit: r1339936 - in
/camel/branches/camel-2.9.x/components/camel-stream/src:
main/java/org/apache/camel/component/stream/
test/java/org/apache/camel/component/stream/
Author: hadrian
Date: Fri May 18 00:42:55 2012
New Revision: 1339936
URL: http://svn.apache.org/viewvc?rev=1339936&view=rev
Log:
CAMEL-5284. Do not close stream after each write
Modified:
camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
camel/branches/camel-2.9.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java
Modified: camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=1339936&r1=1339935&r2=1339936&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java (original)
+++ camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java Fri May 18 00:42:55 2012
@@ -29,17 +29,20 @@ import org.slf4j.LoggerFactory;
public class StreamEndpoint extends DefaultEndpoint {
private static final transient Logger LOG = LoggerFactory.getLogger(StreamEndpoint.class);
+ private String url;
+
private String fileName;
private boolean scanStream;
private boolean retry;
private long scanStreamDelay;
- private String url;
private long delay;
private String encoding;
private String promptMessage;
private long promptDelay;
private long initialPromptDelay = 2000;
private int groupLines;
+ private int autoCloseCount = 0;
+
private Charset charset;
public StreamEndpoint(String endpointUri, Component component) throws Exception {
@@ -154,6 +157,14 @@ public class StreamEndpoint extends Defa
this.groupLines = groupLines;
}
+ public int getAutoCloseCount() {
+ return autoCloseCount;
+ }
+
+ public void setAutoCloseCount(int autoCloseCount) {
+ this.autoCloseCount = autoCloseCount;
+ }
+
public Charset getCharset() {
return charset;
}
@@ -176,5 +187,4 @@ public class StreamEndpoint extends Defa
return Charset.forName(encoding);
}
-
}
Modified: camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java?rev=1339936&r1=1339935&r2=1339936&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java (original)
+++ camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java Fri May 18 00:42:55 2012
@@ -28,6 +28,7 @@ import java.net.URLConnection;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
@@ -47,6 +48,8 @@ public class StreamProducer extends Defa
private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
private StreamEndpoint endpoint;
private String uri;
+ private OutputStream outputStream;
+ private AtomicInteger count = new AtomicInteger();
public StreamProducer(StreamEndpoint endpoint, String uri) throws Exception {
super(endpoint);
@@ -54,26 +57,25 @@ public class StreamProducer extends Defa
validateUri(uri);
}
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ closeStream(true);
+ }
+
public void process(Exchange exchange) throws Exception {
delay(endpoint.getDelay());
- OutputStream outputStream = System.out;
- boolean isSystemStream = false;
- if ("out".equals(uri)) {
- isSystemStream = true;
- outputStream = System.out;
- } else if ("err".equals(uri)) {
- isSystemStream = true;
- outputStream = System.err;
- } else if ("file".equals(uri)) {
- outputStream = resolveStreamFromFile();
- } else if ("header".equals(uri)) {
- outputStream = resolveStreamFromHeader(exchange.getIn().getHeader("stream"), exchange);
- } else if ("url".equals(uri)) {
- outputStream = resolveStreamFromUrl();
- }
- writeToStream(outputStream, exchange);
- closeStream(outputStream, isSystemStream);
+ synchronized(this) {
+ openStream(exchange);
+ writeToStream(outputStream, exchange);
+ closeStream(false);
+ }
}
private OutputStream resolveStreamFromUrl() throws IOException {
@@ -108,7 +110,7 @@ public class StreamProducer extends Defa
Thread.sleep(ms);
}
- private void writeToStream(OutputStream outputStream, Exchange exchange) throws IOException, CamelExchangeException {
+ private synchronized void writeToStream(OutputStream outputStream, Exchange exchange) throws IOException, CamelExchangeException {
Object body = exchange.getIn().getBody();
// if not a string then try as byte array first
@@ -134,10 +136,52 @@ public class StreamProducer extends Defa
bw.flush();
}
- private void closeStream(OutputStream outputStream, boolean isSystemStream) throws Exception {
- // important: do not close the writer on a standard system.out etc.
- if (outputStream != null && !isSystemStream) {
+ private synchronized void openStream() throws Exception {
+ if (outputStream != null) {
+ return;
+ }
+
+ if ("out".equals(uri)) {
+ outputStream = System.out;
+ } else if ("err".equals(uri)) {
+ outputStream = System.err;
+ } else if ("file".equals(uri)) {
+ outputStream = resolveStreamFromFile();
+ } else if ("url".equals(uri)) {
+ outputStream = resolveStreamFromUrl();
+ }
+ count.set(outputStream == null ? 0 : endpoint.getAutoCloseCount());
+ LOG.debug("Opened stream '{}'", endpoint.getEndpointKey());
+ }
+
+ private synchronized void openStream(final Exchange exchange) throws Exception {
+ if (outputStream != null) {
+ return;
+ }
+ if ("header".equals(uri)) {
+ outputStream = resolveStreamFromHeader(exchange.getIn().getHeader("stream"), exchange);
+ LOG.debug("Opened stream '{}'", endpoint.getEndpointKey());
+ } else {
+ openStream();
+ }
+ }
+
+ private synchronized void closeStream(boolean force) throws Exception {
+ if (outputStream == null) {
+ return;
+ }
+
+ // never close a standard stream (system.out or system.err)
+ // always close a 'header' stream (unless it's a system stream)
+ boolean systemStream = outputStream != System.out || outputStream != System.err;
+ boolean headerStream = "header".equals(uri) && !systemStream;
+ boolean reachedLimit = endpoint.getAutoCloseCount() > 0 && count.decrementAndGet() <= 0;
+ boolean expiredStream = force || headerStream || reachedLimit; // evaluation order is important!
+
+ if (expiredStream) {
outputStream.close();
+ outputStream = null;
+ LOG.debug("Closed stream '{}'", endpoint.getEndpointKey());
}
}
@@ -161,4 +205,3 @@ public class StreamProducer extends Defa
}
}
}
-
Modified: camel/branches/camel-2.9.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java?rev=1339936&r1=1339935&r2=1339936&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java (original)
+++ camel/branches/camel-2.9.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java Fri May 18 00:42:55 2012
@@ -83,20 +83,21 @@ public class StreamFileTest extends Came
@Test
public void testFileProducer() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hadrian", "Camel");
+ mock.expectedMessageCount(3);
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start").routeId("produce")
- .to("stream:file?fileName=target/stream/StreamFileTest.txt");
- from("file://target/stream?fileName=StreamFileTest.txt").routeId("consume").autoStartup(false)
+ .to("stream:file?fileName=target/stream/StreamFileTest.txt&autoCloseCount=2");
+ from("file://target/stream?fileName=StreamFileTest.txt&noop=true").routeId("consume").autoStartup(false)
.split().tokenize("\n").to("mock:result");
}
});
context.start();
template.sendBody("direct:start", "Hadrian");
+ template.sendBody("direct:start", "Apache");
template.sendBody("direct:start", "Camel");
context.startRoute("consume");