You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/05 13:33:37 UTC

camel git commit: CAMEL-9274: Data formats unmarshal should use OutputStreamBuilder so we can better support stream caching. Many thanks to Aaron Whiteside for the patch.

Repository: camel
Updated Branches:
  refs/heads/master 205a99986 -> 53f3afaf3


CAMEL-9274: Data formats unmarshal should use OutputStreamBuilder so we can better support stream caching. Many thanks to Aaron Whiteside for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/53f3afaf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/53f3afaf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/53f3afaf

Branch: refs/heads/master
Commit: 53f3afaf34ad1e079d93b583ecdf159d357f6bb7
Parents: 205a999
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Nov 5 13:36:46 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Nov 5 13:36:46 2015 +0100

----------------------------------------------------------------------
 .../converter/stream/OutputStreamBuilder.java   | 94 ++++++++++++++++++++
 .../org/apache/camel/impl/ZipDataFormat.java    | 18 ++--
 .../camel/processor/MarshalProcessor.java       | 24 +----
 .../org/apache/camel/util/ExchangeHelper.java   | 14 +++
 .../apache/camel/impl/ZipDataFormatTest.java    | 24 +++++
 .../converter/crypto/CryptoDataFormat.java      | 17 ++--
 .../camel/converter/crypto/HMACAccumulator.java |  2 +-
 .../crypto/PGPKeyAccessDataFormat.java          | 36 ++------
 .../dataformat/zipfile/ZipFileDataFormat.java   | 29 +++---
 .../zipfile/ZipFileDataFormatTest.java          | 24 ++++-
 10 files changed, 195 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/camel-core/src/main/java/org/apache/camel/converter/stream/OutputStreamBuilder.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/OutputStreamBuilder.java b/camel-core/src/main/java/org/apache/camel/converter/stream/OutputStreamBuilder.java
new file mode 100644
index 0000000..d25780b
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/OutputStreamBuilder.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.util.ExchangeHelper;
+
+/**
+ * Utility to hide the complexity of choosing which OutputStream
+ * implementation to choose.
+ * <p/>
+ * Itself masquerades as an OutputStream, but really delegates to a
+ * CachedOutputStream of a ByteArrayOutputStream.
+ */
+public final class OutputStreamBuilder extends OutputStream {
+
+    private final OutputStream outputStream;
+
+    private OutputStreamBuilder(final Exchange exchange) {
+        if (ExchangeHelper.isStreamCachingEnabled(exchange)) {
+            outputStream = new CachedOutputStream(exchange);
+        } else {
+            outputStream = new ByteArrayOutputStream();
+        }
+    }
+
+    /**
+     * Creates a new OutputStreamBuilder with the current exchange
+     * <p/>
+     * Use the {@link #build()} when writing to the stream is finished,
+     * and you need the result of this operation.
+     *
+     * @param exchange the current Exchange
+     * @return the builder
+     */
+    public static OutputStreamBuilder withExchange(final Exchange exchange) {
+        return new OutputStreamBuilder(exchange);
+    }
+
+    @Override
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        outputStream.write(b, off, len);
+    }
+
+    @Override
+    public void write(final byte[] b) throws IOException {
+        outputStream.write(b);
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        outputStream.write(b);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        outputStream.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        outputStream.close();
+    }
+
+    /**
+     * Builds the result of using this builder as either a
+     * {@link org.apache.camel.converter.stream.CachedOutputStream} if stream caching is enabled,
+     * otherwise byte[].
+     */
+    public Object build() throws IOException {
+        if (outputStream instanceof CachedOutputStream) {
+            return ((CachedOutputStream)outputStream).newStreamCache();
+        }
+        return ((ByteArrayOutputStream)outputStream).toByteArray();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/camel-core/src/main/java/org/apache/camel/impl/ZipDataFormat.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ZipDataFormat.java b/camel-core/src/main/java/org/apache/camel/impl/ZipDataFormat.java
index c107f3f..68ee7be 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ZipDataFormat.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ZipDataFormat.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.impl;
 
-import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.zip.Deflater;
@@ -24,6 +23,7 @@ import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.converter.stream.OutputStreamBuilder;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.DataFormatName;
 import org.apache.camel.util.IOHelper;
@@ -57,7 +57,7 @@ public class ZipDataFormat extends org.apache.camel.support.ServiceSupport imple
         this.compressionLevel = compressionLevel;
     }
 
-    public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
+    public void marshal(final Exchange exchange, final Object graph, final OutputStream stream) throws Exception {
         // ask for a mandatory type conversion to avoid a possible NPE beforehand as we do copy from the InputStream
         InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, graph);
 
@@ -69,18 +69,16 @@ public class ZipDataFormat extends org.apache.camel.support.ServiceSupport imple
         }
     }
 
-    public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
-        InputStream is = exchange.getIn().getMandatoryBody(InputStream.class);
-        InflaterInputStream unzipInput = new InflaterInputStream(is);
+    public Object unmarshal(final Exchange exchange, final InputStream inputStream) throws Exception {
+        InflaterInputStream inflaterInputStream = new InflaterInputStream(inputStream);
+        OutputStreamBuilder osb = OutputStreamBuilder.withExchange(exchange);
 
-        // Create an expandable byte array to hold the inflated data
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
         try {
-            IOHelper.copy(unzipInput, bos);
-            return bos.toByteArray();
+            IOHelper.copy(inflaterInputStream, osb);
+            return osb.build();
         } finally {
             // must close input streams
-            IOHelper.close(is, unzipInput);
+            IOHelper.close(osb, inflaterInputStream, inputStream);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java
index d16205f..6206a73 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.processor;
 
-import java.io.ByteArrayOutputStream;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
@@ -25,7 +23,7 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Traceable;
-import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.converter.stream.OutputStreamBuilder;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ServiceSupport;
@@ -57,15 +55,7 @@ public class MarshalProcessor extends ServiceSupport implements AsyncProcessor,
 
         // if stream caching is enabled then use that so we can stream accordingly
         // for example to overflow to disk for big streams
-        CachedOutputStream cos;
-        ByteArrayOutputStream os;
-        if (exchange.getContext().getStreamCachingStrategy().isEnabled()) {
-            cos = new CachedOutputStream(exchange);
-            os = null;
-        } else {
-            cos = null;
-            os = new ByteArrayOutputStream();
-        }
+        OutputStreamBuilder osb = OutputStreamBuilder.withExchange(exchange);
 
         Message in = exchange.getIn();
         Object body = in.getBody();
@@ -76,14 +66,8 @@ public class MarshalProcessor extends ServiceSupport implements AsyncProcessor,
         out.copyFrom(in);
 
         try {
-            if (cos != null) {
-                dataFormat.marshal(exchange, body, cos);
-                out.setBody(cos.newStreamCache());
-            } else {
-                dataFormat.marshal(exchange, body, os);
-                byte[] data = os.toByteArray();
-                out.setBody(data);
-            }
+            dataFormat.marshal(exchange, body, osb);
+            out.setBody(osb.build());
         } catch (Throwable e) {
             // remove OUT message, as an exception occurred
             exchange.setOut(null);

http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 63d8013..505f04e 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -628,6 +628,20 @@ public final class ExchangeHelper {
     }
 
     /**
+     * Check whether or not stream caching is enabled for the given route or globally.
+     *
+     * @param exchange  the exchange
+     * @return <tt>true</tt> if enabled, <tt>false</tt> otherwise
+     */
+    public static boolean isStreamCachingEnabled(final Exchange exchange) {
+        if (exchange.getFromRouteId() == null) {
+            return exchange.getContext().getStreamCachingStrategy().isEnabled();
+        } else {
+            return exchange.getContext().getRoute(exchange.getFromRouteId()).getRouteContext().isStreamCaching();
+        }
+    }
+
+    /**
      * Extracts the body from the given exchange.
      * <p/>
      * If the exchange pattern is provided it will try to honor it and retrieve the body

http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/camel-core/src/test/java/org/apache/camel/impl/ZipDataFormatTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/ZipDataFormatTest.java b/camel-core/src/test/java/org/apache/camel/impl/ZipDataFormatTest.java
index b2b18ca..b4df4cf 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/ZipDataFormatTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/ZipDataFormatTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.impl;
 
 import java.io.ByteArrayOutputStream;
+import java.util.List;
 import java.util.zip.Deflater;
 import java.util.zip.Inflater;
 
@@ -26,6 +27,7 @@ import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.InputStreamCache;
 import org.apache.camel.spi.DataFormat;
 
 /**
@@ -117,6 +119,28 @@ public class ZipDataFormatTest extends ContextTestSupport {
         result.expectedBodiesReceived(TEXT);
         sendText();
         result.assertIsSatisfied();
+        List<Exchange> exchangeList = result.getExchanges();
+        assertTrue(exchangeList.get(0).getIn().getBody() instanceof byte[]);
+    }
+
+    public void testStreamCacheUnzip() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .streamCaching()
+                    .marshal().zip()
+                    .unmarshal().zip()
+                    .to("mock:result");
+            }
+        });
+        context.start();
+
+        MockEndpoint result = context.getEndpoint("mock:result", MockEndpoint.class);
+        result.expectedBodiesReceived(TEXT);
+        sendText();
+        result.assertIsSatisfied();
+        List<Exchange> exchangeList = result.getExchanges();
+        assertTrue(exchangeList.get(0).getIn().getBody() instanceof InputStreamCache);
     }
 
     private void sendText() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/CryptoDataFormat.java
----------------------------------------------------------------------
diff --git a/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/CryptoDataFormat.java b/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/CryptoDataFormat.java
index e8c5178..c602516 100644
--- a/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/CryptoDataFormat.java
+++ b/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/CryptoDataFormat.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.converter.crypto;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -34,6 +33,7 @@ import static javax.crypto.Cipher.DECRYPT_MODE;
 import static javax.crypto.Cipher.ENCRYPT_MODE;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.converter.stream.OutputStreamBuilder;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.DataFormatName;
 import org.apache.camel.support.ServiceSupport;
@@ -153,31 +153,30 @@ public class CryptoDataFormat extends ServiceSupport implements DataFormat, Data
         }
     }
 
-    public Object unmarshal(Exchange exchange, InputStream encryptedStream) throws Exception {
-        Object unmarshalled = null;
+    public Object unmarshal(final Exchange exchange, final InputStream encryptedStream) throws Exception {
         if (encryptedStream != null) {
             byte[] iv = getInlinedInitializationVector(exchange, encryptedStream);
             Key key = getKey(exchange);
             CipherInputStream cipherStream = null;
-            ByteArrayOutputStream plaintextStream = null;
+            OutputStreamBuilder osb = null;
             try {
                 cipherStream = new CipherInputStream(encryptedStream, initializeCipher(DECRYPT_MODE, key, iv));
-                plaintextStream = new ByteArrayOutputStream(bufferSize);
+                osb = OutputStreamBuilder.withExchange(exchange);
                 HMACAccumulator hmac = getMessageAuthenticationCode(key);
                 byte[] buffer = new byte[bufferSize];
-                hmac.attachStream(plaintextStream);
+                hmac.attachStream(osb);
                 int read;
                 while ((read = cipherStream.read(buffer)) >= 0) {
                     hmac.decryptUpdate(buffer, read);
                 }
                 hmac.validate();
-                unmarshalled = plaintextStream.toByteArray();
+                return osb.build();
             } finally {
                 IOHelper.close(cipherStream, "cipher", LOG);
-                IOHelper.close(plaintextStream, "plaintext", LOG);
+                IOHelper.close(osb, "plaintext", LOG);
             }
         }
-        return unmarshalled;
+        return null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/HMACAccumulator.java
----------------------------------------------------------------------
diff --git a/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/HMACAccumulator.java b/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/HMACAccumulator.java
index a44b4e3..0092f96 100644
--- a/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/HMACAccumulator.java
+++ b/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/HMACAccumulator.java
@@ -114,7 +114,7 @@ public class HMACAccumulator {
         return maclength;
     }
 
-    public void attachStream(ByteArrayOutputStream outputStream) {
+    public void attachStream(final OutputStream outputStream) {
         this.outputStream = outputStream;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/PGPKeyAccessDataFormat.java
----------------------------------------------------------------------
diff --git a/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/PGPKeyAccessDataFormat.java b/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/PGPKeyAccessDataFormat.java
index a416d0d..0851630 100644
--- a/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/PGPKeyAccessDataFormat.java
+++ b/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/PGPKeyAccessDataFormat.java
@@ -17,7 +17,6 @@
 package org.apache.camel.converter.crypto;
 
 import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -31,7 +30,7 @@ import java.util.Date;
 import java.util.List;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.converter.stream.OutputStreamBuilder;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.DataFormatName;
 import org.apache.camel.support.ServiceSupport;
@@ -367,10 +366,7 @@ public class PGPKeyAccessDataFormat extends ServiceSupport implements DataFormat
         InputStream encData = null;
         InputStream uncompressedData = null;
         InputStream litData = null;
-
-        CachedOutputStream cos;
-        ByteArrayOutputStream bos;
-        OutputStream os = null;
+        OutputStreamBuilder osb = null;
 
         try {
             in = PGPUtil.getDecoderStream(encryptedStream);
@@ -405,37 +401,23 @@ public class PGPKeyAccessDataFormat extends ServiceSupport implements DataFormat
                 throw getFormatException();
             }
             litData = ld.getInputStream();
-
-            // enable streaming via OutputStreamCache
-            if (exchange.getContext().getStreamCachingStrategy().isEnabled()) {
-                cos = new CachedOutputStream(exchange);
-                bos = null;
-                os = cos;
-            } else {
-                cos = null;
-                bos = new ByteArrayOutputStream();
-                os = bos;
-            }
+            osb = OutputStreamBuilder.withExchange(exchange);
 
             byte[] buffer = new byte[BUFFER_SIZE];
             int bytesRead;
             while ((bytesRead = litData.read(buffer)) != -1) {
-                os.write(buffer, 0, bytesRead);
+                osb.write(buffer, 0, bytesRead);
                 if (signature != null) {
                     signature.update(buffer, 0, bytesRead);
                 }
-                os.flush();
+                osb.flush();
             }
             verifySignature(pgpFactory, signature);
         } finally {
-            IOHelper.close(os, litData, uncompressedData, encData, in, encryptedStream);
+            IOHelper.close(osb, litData, uncompressedData, encData, in, encryptedStream);
         }
 
-        if (cos != null) {
-            return cos.newStreamCache();
-        } else {
-            return bos.toByteArray();
-        }
+        return osb.build();
     }
 
     private InputStream getDecryptedData(Exchange exchange, InputStream encryptedStream) throws Exception, PGPException {
@@ -741,8 +723,8 @@ public class PGPKeyAccessDataFormat extends ServiceSupport implements DataFormat
      * 
      * @param signatureVerificationOption
      *            signature verification option
-     * @throws IllegalArgument
-     *             exception if an invalid value is entered
+     * @throws IllegalArgumentException
+     *            if an invalid value is entered
      */
     public void setSignatureVerificationOption(String signatureVerificationOption) {
         if (SIGNATURE_VERIFICATION_OPTIONS.contains(signatureVerificationOption)) {

http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java
----------------------------------------------------------------------
diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java
index e5cffac..b251201 100644
--- a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java
+++ b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java
@@ -16,15 +16,15 @@
  */
 package org.apache.camel.dataformat.zipfile;
 
-import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.file.Paths;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 import java.util.zip.ZipOutputStream;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.converter.stream.OutputStreamBuilder;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.DataFormatName;
 import org.apache.camel.support.ServiceSupport;
@@ -46,13 +46,13 @@ public class ZipFileDataFormat extends ServiceSupport implements DataFormat, Dat
     }
 
     @Override
-    public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
+    public void marshal(final Exchange exchange, final Object graph, final OutputStream stream) throws Exception {
         String filename = exchange.getIn().getHeader(FILE_NAME, String.class);
-        if (filename != null) {
-            filename = new File(filename).getName(); // remove any path elements
-        } else {
+        if (filename == null) {
             // generate the file name as the camel file component would do
             filename = StringHelper.sanitize(exchange.getIn().getMessageId());
+        } else {
+            filename = Paths.get(filename).getFileName().toString(); // remove any path elements
         }
 
         ZipOutputStream zos = new ZipOutputStream(stream);
@@ -71,20 +71,18 @@ public class ZipFileDataFormat extends ServiceSupport implements DataFormat, Dat
     }
 
     @Override
-    public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
+    public Object unmarshal(final Exchange exchange, final InputStream inputStream) throws Exception {
         if (usingIterator) {
             return new ZipIterator(exchange.getIn());
         } else {
-            InputStream is = exchange.getIn().getMandatoryBody(
-                    InputStream.class);
-            ZipInputStream zis = new ZipInputStream(is);
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ZipInputStream zis = new ZipInputStream(inputStream);
+            OutputStreamBuilder osb = OutputStreamBuilder.withExchange(exchange);
 
             try {
                 ZipEntry entry = zis.getNextEntry();
                 if (entry != null) {
                     exchange.getOut().setHeader(FILE_NAME, entry.getName());
-                    IOHelper.copy(zis, baos);
+                    IOHelper.copy(zis, osb);
                 }
 
                 entry = zis.getNextEntry();
@@ -92,10 +90,9 @@ public class ZipFileDataFormat extends ServiceSupport implements DataFormat, Dat
                     throw new IllegalStateException("Zip file has more than 1 entry.");
                 }
 
-                return baos.toByteArray();
-
+                return osb.build();
             } finally {
-                IOHelper.close(zis, baos);
+                IOHelper.close(zis, osb);
             }
         }
     }
@@ -117,4 +114,4 @@ public class ZipFileDataFormat extends ServiceSupport implements DataFormat, Dat
     protected void doStop() throws Exception {
         // noop
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormatTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormatTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormatTest.java
index 4bcc6b1..7273179 100644
--- a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormatTest.java
+++ b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormatTest.java
@@ -29,9 +29,9 @@ import org.apache.camel.Exchange;
 import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.InputStreamCache;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.apache.camel.util.IOHelper;
-import org.apache.camel.util.ObjectHelper;
 import org.junit.Test;
 
 import static org.apache.camel.Exchange.FILE_NAME;
@@ -55,6 +55,21 @@ public class ZipFileDataFormatTest extends CamelTestSupport {
     private static final File TEST_DIR = new File("target/zip");
 
     @Test
+    public void testZipAndStreamCaching() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:zipStreamCache");
+        mock.setExpectedMessageCount(1);
+
+        template.sendBody("direct:zipStreamCache", TEXT);
+
+        assertMockEndpointsSatisfied();
+
+        Exchange exchange = mock.getReceivedExchanges().get(0);
+        assertEquals(exchange.getIn().getMessageId() + ".zip", exchange.getIn().getHeader(FILE_NAME));
+        assertIsInstanceOf(InputStreamCache.class, exchange.getIn().getBody());
+        assertArrayEquals(getZippedText(exchange.getIn().getMessageId()), exchange.getIn().getMandatoryBody(byte[].class));
+    }
+
+    @Test
     public void testZipWithoutFileName() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:zip");
         mock.expectedMessageCount(1);
@@ -65,7 +80,7 @@ public class ZipFileDataFormatTest extends CamelTestSupport {
 
         Exchange exchange = mock.getReceivedExchanges().get(0);
         assertEquals(exchange.getIn().getMessageId() + ".zip", exchange.getIn().getHeader(FILE_NAME));
-        assertTrue(ObjectHelper.equalByteArray(getZippedText(exchange.getIn().getMessageId()), (byte[])exchange.getIn().getBody()));
+        assertArrayEquals(getZippedText(exchange.getIn().getMessageId()), (byte[])exchange.getIn().getBody());
     }
 
     @Test
@@ -122,7 +137,7 @@ public class ZipFileDataFormatTest extends CamelTestSupport {
         Exchange exchange = mock.getReceivedExchanges().get(0);
         File file = new File(TEST_DIR, exchange.getIn().getMessageId() + ".zip");
         assertTrue("The file should exist.", file.exists());
-        assertTrue("Get a wrong message content.", ObjectHelper.equalByteArray(getZippedText(exchange.getIn().getMessageId()), getBytes(file)));
+        assertArrayEquals("Get a wrong message content.", getZippedText(exchange.getIn().getMessageId()), getBytes(file));
     }
 
     @Test
@@ -144,7 +159,7 @@ public class ZipFileDataFormatTest extends CamelTestSupport {
         assertTrue("The exchange is not done in time.", notify.matches(5, TimeUnit.SECONDS));
 
         assertTrue("The file should exist.", file.exists());
-        assertTrue("Get a wrong message content.", ObjectHelper.equalByteArray(getZippedText("poem.txt"), getBytes(file)));
+        assertArrayEquals("Get a wrong message content.", getZippedText("poem.txt"), getBytes(file));
     }
 
     @Test
@@ -188,6 +203,7 @@ public class ZipFileDataFormatTest extends CamelTestSupport {
                 from("direct:zipToFile").marshal(zip).to("file:" + TEST_DIR.getPath()).to("mock:zipToFile");
                 from("direct:dslZip").marshal().zipFile().to("mock:dslZip");
                 from("direct:dslUnzip").unmarshal().zipFile().to("mock:dslUnzip");
+                from("direct:zipStreamCache").streamCaching().marshal().zipFile().to("mock:zipStreamCache");
             }
         };
     }