You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2012/05/18 02:42:56 UTC

svn commit: r1339936 - in /camel/branches/camel-2.9.x/components/camel-stream/src: main/java/org/apache/camel/component/stream/ test/java/org/apache/camel/component/stream/

Author: hadrian
Date: Fri May 18 00:42:55 2012
New Revision: 1339936

URL: http://svn.apache.org/viewvc?rev=1339936&view=rev
Log:
CAMEL-5284. Do not close stream after each write

Modified:
    camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
    camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
    camel/branches/camel-2.9.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java

Modified: camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=1339936&r1=1339935&r2=1339936&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java (original)
+++ camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java Fri May 18 00:42:55 2012
@@ -29,17 +29,20 @@ import org.slf4j.LoggerFactory;
 public class StreamEndpoint extends DefaultEndpoint {
     private static final transient Logger LOG = LoggerFactory.getLogger(StreamEndpoint.class);
 
+    private String url;
+
     private String fileName;
     private boolean scanStream;
     private boolean retry;
     private long scanStreamDelay;
-    private String url;
     private long delay;
     private String encoding;
     private String promptMessage;
     private long promptDelay;
     private long initialPromptDelay = 2000;
     private int groupLines;
+    private int autoCloseCount = 0;
+    
     private Charset charset;
 
     public StreamEndpoint(String endpointUri, Component component) throws Exception {
@@ -154,6 +157,14 @@ public class StreamEndpoint extends Defa
         this.groupLines = groupLines;
     }
     
+    public int getAutoCloseCount() {
+        return autoCloseCount;
+    }
+
+    public void setAutoCloseCount(int autoCloseCount) {
+        this.autoCloseCount = autoCloseCount;
+    }
+
     public Charset getCharset() {
         return charset;
     }
@@ -176,5 +187,4 @@ public class StreamEndpoint extends Defa
 
         return Charset.forName(encoding);
     }
-
 }

Modified: camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java?rev=1339936&r1=1339935&r2=1339936&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java (original)
+++ camel/branches/camel-2.9.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java Fri May 18 00:42:55 2012
@@ -28,6 +28,7 @@ import java.net.URLConnection;
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
@@ -47,6 +48,8 @@ public class StreamProducer extends Defa
     private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
     private StreamEndpoint endpoint;
     private String uri;
+    private OutputStream outputStream;
+    private AtomicInteger count = new AtomicInteger();
 
     public StreamProducer(StreamEndpoint endpoint, String uri) throws Exception {
         super(endpoint);
@@ -54,26 +57,25 @@ public class StreamProducer extends Defa
         validateUri(uri);
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        closeStream(true);
+    }
+
     public void process(Exchange exchange) throws Exception {
         delay(endpoint.getDelay());
-        OutputStream outputStream = System.out;         
-        boolean isSystemStream = false;
-        if ("out".equals(uri)) {
-            isSystemStream = true;
-            outputStream = System.out;
-        } else if ("err".equals(uri)) {
-            isSystemStream = true;
-            outputStream = System.err;
-        } else if ("file".equals(uri)) {
-            outputStream = resolveStreamFromFile();
-        } else if ("header".equals(uri)) {
-            outputStream = resolveStreamFromHeader(exchange.getIn().getHeader("stream"), exchange);
-        } else if ("url".equals(uri)) {
-            outputStream = resolveStreamFromUrl();
-        }
 
-        writeToStream(outputStream, exchange);
-        closeStream(outputStream, isSystemStream);
+        synchronized(this) {
+            openStream(exchange);
+            writeToStream(outputStream, exchange);
+            closeStream(false);
+        }
     }
 
     private OutputStream resolveStreamFromUrl() throws IOException {
@@ -108,7 +110,7 @@ public class StreamProducer extends Defa
         Thread.sleep(ms);
     }
 
-    private void writeToStream(OutputStream outputStream, Exchange exchange) throws IOException, CamelExchangeException {
+    private synchronized void writeToStream(OutputStream outputStream, Exchange exchange) throws IOException, CamelExchangeException {
         Object body = exchange.getIn().getBody();
 
         // if not a string then try as byte array first
@@ -134,10 +136,52 @@ public class StreamProducer extends Defa
         bw.flush();
     }
 
-    private void closeStream(OutputStream outputStream, boolean isSystemStream) throws Exception {
-        // important: do not close the writer on a standard system.out etc.
-        if (outputStream != null && !isSystemStream) {
+    private synchronized void openStream() throws Exception {
+        if (outputStream != null) {
+            return;
+        }
+
+        if ("out".equals(uri)) {
+            outputStream = System.out;
+        } else if ("err".equals(uri)) {
+            outputStream = System.err;
+        } else if ("file".equals(uri)) {
+            outputStream = resolveStreamFromFile();
+        } else if ("url".equals(uri)) {
+            outputStream = resolveStreamFromUrl();
+        }
+        count.set(outputStream == null ? 0 : endpoint.getAutoCloseCount());
+        LOG.debug("Opened stream '{}'", endpoint.getEndpointKey());
+    }
+
+    private synchronized void openStream(final Exchange exchange) throws Exception {
+        if (outputStream != null) {
+            return;
+        }
+        if ("header".equals(uri)) {
+            outputStream = resolveStreamFromHeader(exchange.getIn().getHeader("stream"), exchange);
+            LOG.debug("Opened stream '{}'", endpoint.getEndpointKey());
+        } else {
+            openStream();
+        }
+    }
+
+    private synchronized void closeStream(boolean force) throws Exception {
+        if (outputStream == null) {
+            return;
+        }
+
+        // never close a standard stream (system.out or system.err)
+        // always close a 'header' stream (unless it's a system stream)
+        boolean systemStream = outputStream != System.out || outputStream != System.err;
+        boolean headerStream = "header".equals(uri) && !systemStream;
+        boolean reachedLimit = endpoint.getAutoCloseCount() > 0 && count.decrementAndGet() <= 0;
+        boolean expiredStream = force || headerStream || reachedLimit;  // evaluation order is important!
+
+        if (expiredStream) {
             outputStream.close();
+            outputStream = null;
+            LOG.debug("Closed stream '{}'", endpoint.getEndpointKey());
         }
     }
 
@@ -161,4 +205,3 @@ public class StreamProducer extends Defa
         }
     }
 }
-

Modified: camel/branches/camel-2.9.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java?rev=1339936&r1=1339935&r2=1339936&view=diff
==============================================================================
--- camel/branches/camel-2.9.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java (original)
+++ camel/branches/camel-2.9.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java Fri May 18 00:42:55 2012
@@ -83,20 +83,21 @@ public class StreamFileTest extends Came
     @Test
     public void testFileProducer() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hadrian", "Camel");
+        mock.expectedMessageCount(3);
         
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 from("direct:start").routeId("produce")
-                    .to("stream:file?fileName=target/stream/StreamFileTest.txt");
-                from("file://target/stream?fileName=StreamFileTest.txt").routeId("consume").autoStartup(false)
+                    .to("stream:file?fileName=target/stream/StreamFileTest.txt&autoCloseCount=2");
+                from("file://target/stream?fileName=StreamFileTest.txt&noop=true").routeId("consume").autoStartup(false)
                     .split().tokenize("\n").to("mock:result");
             }
         });
         context.start();
 
         template.sendBody("direct:start", "Hadrian");
+        template.sendBody("direct:start", "Apache");
         template.sendBody("direct:start", "Camel");
         
         context.startRoute("consume");