You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/25 07:39:59 UTC

[1/2] git commit: Fixed test

Updated Branches:
  refs/heads/master c83e23c0c -> 1b4b63981


Fixed test


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1b4b6398
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1b4b6398
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1b4b6398

Branch: refs/heads/master
Commit: 1b4b63981e87e9a468d969c4cdbbfcf2334992a9
Parents: b5cddbb
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jun 24 20:00:55 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jun 25 07:39:14 2013 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/component/log/LogInputStreamTest.java   | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1b4b6398/camel-core/src/test/java/org/apache/camel/component/log/LogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/log/LogInputStreamTest.java b/camel-core/src/test/java/org/apache/camel/component/log/LogInputStreamTest.java
index 37d9340..898d03c 100644
--- a/camel-core/src/test/java/org/apache/camel/component/log/LogInputStreamTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/log/LogInputStreamTest.java
@@ -41,8 +41,7 @@ public class LogInputStreamTest extends ContextTestSupport {
     public void testB() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:b");
         mock.expectedMessageCount(1);
-        // showStreams is enabled so we cannot re-read the input stream
-        mock.message(0).body(String.class).isEqualTo("");
+        mock.message(0).body(String.class).isEqualTo("Hello World");
 
         InputStream is = new ByteArrayInputStream("Hello World".getBytes());
         template.sendBody("direct:b", is);


[2/2] git commit: CAMEL-6483: Optimized camel-jetty writing response. As well IOHelper copy streams if using byte arrays as input.

Posted by da...@apache.org.
CAMEL-6483: Optimized camel-jetty writing response. As well IOHelper copy streams if using byte arrays as input.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b5cddbb2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b5cddbb2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b5cddbb2

Branch: refs/heads/master
Commit: b5cddbb2c1723502abdb2f91c3efa0203db0ca53
Parents: c83e23c
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jun 24 16:13:35 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jun 25 07:39:14 2013 +0200

----------------------------------------------------------------------
 .../converter/stream/CachedOutputStream.java    |  4 ++
 .../java/org/apache/camel/util/IOHelper.java    | 41 ++++++++++++++++----
 .../component/http/DefaultHttpBinding.java      | 39 ++++++++++++++-----
 .../jetty/SimpleJettyChunkedFalseTest.java      | 40 +++++++++++++++++++
 4 files changed, 108 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b5cddbb2/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
index 4951c90..a2cf0a1 100644
--- a/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
@@ -131,6 +131,10 @@ public class CachedOutputStream extends OutputStream {
         return currentStream.hashCode();
     }
 
+    public OutputStream getCurrentStream() {
+        return currentStream;
+    }
+
     public String toString() {
         return "CachedOutputStream[size: " + totalLength + "]";
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b5cddbb2/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/IOHelper.java b/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
index ca4b82f..15278e2 100644
--- a/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
@@ -20,6 +20,8 @@ import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -34,6 +36,7 @@ import java.nio.charset.Charset;
 import java.nio.charset.UnsupportedCharsetException;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.converter.stream.CachedOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -162,14 +165,32 @@ public final class IOHelper {
     public static int copy(InputStream input, OutputStream output) throws IOException {
         return copy(input, output, DEFAULT_BUFFER_SIZE);
     }
-    
+
     public static int copy(final InputStream input, final OutputStream output, int bufferSize) throws IOException {
-        int avail = input.available();
-        if (avail > 262144) {
-            avail = 262144;
+        return copy(input, output, bufferSize, false);
+    }
+
+    public static int copy(final InputStream input, final OutputStream output, int bufferSize, boolean flushOnEachWrite) throws IOException {
+        if (input instanceof ByteArrayInputStream) {
+            // optimized for byte array as we only need the max size it can be
+            input.mark(0);
+            input.reset();
+            bufferSize = input.available();
+        } else {
+            int avail = input.available();
+            if (avail > bufferSize) {
+                bufferSize = avail;
+            }
         }
-        if (avail > bufferSize) {
-            bufferSize = avail;
+
+        if (bufferSize > 262144) {
+            // upper cap to avoid buffers too big
+            bufferSize = 262144;
+        }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Copying InputStream: {} -> OutputStream: {} with buffer: {} and flush on each write {}",
+                    new Object[]{input, output, bufferSize, flushOnEachWrite});
         }
 
         final byte[] buffer = new byte[bufferSize];
@@ -177,10 +198,16 @@ public final class IOHelper {
         int total = 0;
         while (-1 != n) {
             output.write(buffer, 0, n);
+            if (flushOnEachWrite) {
+                output.flush();
+            }
             total += n;
             n = input.read(buffer);
         }
-        output.flush();
+        if (!flushOnEachWrite) {
+            // flush at end, if we didn't do it during the writing
+            output.flush();
+        }
         return total;
     }
     

http://git-wip-us.apache.org/repos/asf/camel/blob/b5cddbb2/components/camel-http/src/main/java/org/apache/camel/component/http/DefaultHttpBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-http/src/main/java/org/apache/camel/component/http/DefaultHttpBinding.java b/components/camel-http/src/main/java/org/apache/camel/component/http/DefaultHttpBinding.java
index e291480..f58b4e1 100644
--- a/components/camel-http/src/main/java/org/apache/camel/component/http/DefaultHttpBinding.java
+++ b/components/camel-http/src/main/java/org/apache/camel/component/http/DefaultHttpBinding.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.http;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -308,10 +309,11 @@ public class DefaultHttpBinding implements HttpBinding {
         return false;
     }
     
-    protected void copyStream(InputStream is, OutputStream os) throws IOException {
+    protected int copyStream(InputStream is, OutputStream os, int bufferSize) throws IOException {
         try {
-            // copy directly from input stream to output stream
-            IOHelper.copy(is, os);
+            // copy stream, and must flush on each write as etc Jetty has better performance when
+            // flushing after writing to its servlet output stream
+            return IOHelper.copy(is, os, bufferSize, true);
         } finally {
             IOHelper.close(os, is);
         }
@@ -344,31 +346,47 @@ public class DefaultHttpBinding implements HttpBinding {
 
         if (is != null) {
             ServletOutputStream os = response.getOutputStream();
-            LOG.trace("Writing direct response from source input stream to servlet output stream");
             if (!checkChunked(message, exchange)) {
                 CachedOutputStream stream = new CachedOutputStream(exchange);
                 try {
                     // copy directly from input stream to the cached output stream to get the content length
-                    int len = IOHelper.copy(is, stream);
+                    int len = copyStream(is, stream, response.getBufferSize());
                     // we need to setup the length if message is not chucked
                     response.setContentLength(len);
-                    copyStream(stream.getInputStream(), os);
+                    OutputStream current = stream.getCurrentStream();
+                    if (current instanceof ByteArrayOutputStream) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Streaming (direct) response in non-chunked mode with content-length {}");
+                        }
+                        ByteArrayOutputStream bos = (ByteArrayOutputStream) current;
+                        bos.writeTo(os);
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Streaming response in non-chunked mode with content-length {} and buffer size: {}", len, len);
+                        }
+                        copyStream(stream.getInputStream(), os, len);
+                    }
                 } finally {
-                    IOHelper.close(is, stream);
+                    IOHelper.close(is, os);
                 }
             } else {
-                copyStream(is, os);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Streaming response in chunked mode with buffer size {}", response.getBufferSize());
+                }
+                copyStream(is, os, response.getBufferSize());
             }
         } else {
             // not convertable as a stream so fallback as a String
             String data = message.getBody(String.class);
             if (data != null) {
-                LOG.debug("Cannot write from source input stream, falling back to using String content. For binary content this can be a problem.");
                 // set content length and encoding before we write data
                 String charset = IOHelper.getCharsetName(exchange, true);
                 final int dataByteLength = data.getBytes(charset).length;
                 response.setCharacterEncoding(charset);
                 response.setContentLength(dataByteLength);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Writing response in non-chunked mode as plain text with content-length {} and buffer size: {}", dataByteLength, response.getBufferSize());
+                }
                 try {
                     response.getWriter().print(data);
                 } finally {
@@ -403,6 +421,9 @@ public class DefaultHttpBinding implements HttpBinding {
         byte[] data = GZIPHelper.compressGZIP(bytes);
         ServletOutputStream os = response.getOutputStream();
         try {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Streaming response as GZIP in non-chunked mode with content-length {} and buffer size: {}", data.length, response.getBufferSize());
+            }
             response.setContentLength(data.length);
             os.write(data);
             os.flush();

http://git-wip-us.apache.org/repos/asf/camel/blob/b5cddbb2/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/SimpleJettyChunkedFalseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/SimpleJettyChunkedFalseTest.java b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/SimpleJettyChunkedFalseTest.java
new file mode 100644
index 0000000..56114d2
--- /dev/null
+++ b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/SimpleJettyChunkedFalseTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class SimpleJettyChunkedFalseTest extends BaseJettyTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        String result = template.requestBody("http://localhost:{{port}}/myapp", "Camel", String.class);
+        assertEquals("Hello Camel", result);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("jetty:http://localhost:{{port}}/myapp?chunked=false")
+                    .transform(body().prepend("Hello "));
+            }
+        };
+    }
+
+}