You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/05 13:33:37 UTC
camel git commit: CAMEL-9274: Data formats unmarshal should use
OutputStreamBuilder so we can better support stream caching. Many thanks to
Aaron Whiteside for the patch.
Repository: camel
Updated Branches:
refs/heads/master 205a99986 -> 53f3afaf3
CAMEL-9274: Data formats unmarshal should use OutputStreamBuilder so we can better support stream caching. Many thanks to Aaron Whiteside for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/53f3afaf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/53f3afaf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/53f3afaf
Branch: refs/heads/master
Commit: 53f3afaf34ad1e079d93b583ecdf159d357f6bb7
Parents: 205a999
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Nov 5 13:36:46 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Nov 5 13:36:46 2015 +0100
----------------------------------------------------------------------
.../converter/stream/OutputStreamBuilder.java | 94 ++++++++++++++++++++
.../org/apache/camel/impl/ZipDataFormat.java | 18 ++--
.../camel/processor/MarshalProcessor.java | 24 +----
.../org/apache/camel/util/ExchangeHelper.java | 14 +++
.../apache/camel/impl/ZipDataFormatTest.java | 24 +++++
.../converter/crypto/CryptoDataFormat.java | 17 ++--
.../camel/converter/crypto/HMACAccumulator.java | 2 +-
.../crypto/PGPKeyAccessDataFormat.java | 36 ++------
.../dataformat/zipfile/ZipFileDataFormat.java | 29 +++---
.../zipfile/ZipFileDataFormatTest.java | 24 ++++-
10 files changed, 195 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/camel-core/src/main/java/org/apache/camel/converter/stream/OutputStreamBuilder.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/converter/stream/OutputStreamBuilder.java b/camel-core/src/main/java/org/apache/camel/converter/stream/OutputStreamBuilder.java
new file mode 100644
index 0000000..d25780b
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/converter/stream/OutputStreamBuilder.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.util.ExchangeHelper;
+
+/**
+ * Utility to hide the complexity of choosing which OutputStream
+ * implementation to choose.
+ * <p/>
+ * Itself masquerades as an OutputStream, but really delegates to a
+ * CachedOutputStream of a ByteArrayOutputStream.
+ */
+public final class OutputStreamBuilder extends OutputStream {
+
+ private final OutputStream outputStream;
+
+ private OutputStreamBuilder(final Exchange exchange) {
+ if (ExchangeHelper.isStreamCachingEnabled(exchange)) {
+ outputStream = new CachedOutputStream(exchange);
+ } else {
+ outputStream = new ByteArrayOutputStream();
+ }
+ }
+
+ /**
+ * Creates a new OutputStreamBuilder with the current exchange
+ * <p/>
+ * Use the {@link #build()} when writing to the stream is finished,
+ * and you need the result of this operation.
+ *
+ * @param exchange the current Exchange
+ * @return the builder
+ */
+ public static OutputStreamBuilder withExchange(final Exchange exchange) {
+ return new OutputStreamBuilder(exchange);
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ outputStream.write(b, off, len);
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ outputStream.write(b);
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ outputStream.write(b);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ outputStream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ outputStream.close();
+ }
+
+ /**
+ * Builds the result of using this builder as either a
+ * {@link org.apache.camel.converter.stream.CachedOutputStream} if stream caching is enabled,
+ * otherwise byte[].
+ */
+ public Object build() throws IOException {
+ if (outputStream instanceof CachedOutputStream) {
+ return ((CachedOutputStream)outputStream).newStreamCache();
+ }
+ return ((ByteArrayOutputStream)outputStream).toByteArray();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/camel-core/src/main/java/org/apache/camel/impl/ZipDataFormat.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ZipDataFormat.java b/camel-core/src/main/java/org/apache/camel/impl/ZipDataFormat.java
index c107f3f..68ee7be 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ZipDataFormat.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ZipDataFormat.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.impl;
-import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.Deflater;
@@ -24,6 +23,7 @@ import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.camel.Exchange;
+import org.apache.camel.converter.stream.OutputStreamBuilder;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.DataFormatName;
import org.apache.camel.util.IOHelper;
@@ -57,7 +57,7 @@ public class ZipDataFormat extends org.apache.camel.support.ServiceSupport imple
this.compressionLevel = compressionLevel;
}
- public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
+ public void marshal(final Exchange exchange, final Object graph, final OutputStream stream) throws Exception {
// ask for a mandatory type conversion to avoid a possible NPE beforehand as we do copy from the InputStream
InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, graph);
@@ -69,18 +69,16 @@ public class ZipDataFormat extends org.apache.camel.support.ServiceSupport imple
}
}
- public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
- InputStream is = exchange.getIn().getMandatoryBody(InputStream.class);
- InflaterInputStream unzipInput = new InflaterInputStream(is);
+ public Object unmarshal(final Exchange exchange, final InputStream inputStream) throws Exception {
+ InflaterInputStream inflaterInputStream = new InflaterInputStream(inputStream);
+ OutputStreamBuilder osb = OutputStreamBuilder.withExchange(exchange);
- // Create an expandable byte array to hold the inflated data
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
- IOHelper.copy(unzipInput, bos);
- return bos.toByteArray();
+ IOHelper.copy(inflaterInputStream, osb);
+ return osb.build();
} finally {
// must close input streams
- IOHelper.close(is, unzipInput);
+ IOHelper.close(osb, inflaterInputStream, inputStream);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java
index d16205f..6206a73 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MarshalProcessor.java
@@ -16,8 +16,6 @@
*/
package org.apache.camel.processor;
-import java.io.ByteArrayOutputStream;
-
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
@@ -25,7 +23,7 @@ import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Traceable;
-import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.converter.stream.OutputStreamBuilder;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.IdAware;
import org.apache.camel.support.ServiceSupport;
@@ -57,15 +55,7 @@ public class MarshalProcessor extends ServiceSupport implements AsyncProcessor,
// if stream caching is enabled then use that so we can stream accordingly
// for example to overflow to disk for big streams
- CachedOutputStream cos;
- ByteArrayOutputStream os;
- if (exchange.getContext().getStreamCachingStrategy().isEnabled()) {
- cos = new CachedOutputStream(exchange);
- os = null;
- } else {
- cos = null;
- os = new ByteArrayOutputStream();
- }
+ OutputStreamBuilder osb = OutputStreamBuilder.withExchange(exchange);
Message in = exchange.getIn();
Object body = in.getBody();
@@ -76,14 +66,8 @@ public class MarshalProcessor extends ServiceSupport implements AsyncProcessor,
out.copyFrom(in);
try {
- if (cos != null) {
- dataFormat.marshal(exchange, body, cos);
- out.setBody(cos.newStreamCache());
- } else {
- dataFormat.marshal(exchange, body, os);
- byte[] data = os.toByteArray();
- out.setBody(data);
- }
+ dataFormat.marshal(exchange, body, osb);
+ out.setBody(osb.build());
} catch (Throwable e) {
// remove OUT message, as an exception occurred
exchange.setOut(null);
http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 63d8013..505f04e 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -628,6 +628,20 @@ public final class ExchangeHelper {
}
/**
+ * Check whether or not stream caching is enabled for the given route or globally.
+ *
+ * @param exchange the exchange
+ * @return <tt>true</tt> if enabled, <tt>false</tt> otherwise
+ */
+ public static boolean isStreamCachingEnabled(final Exchange exchange) {
+ if (exchange.getFromRouteId() == null) {
+ return exchange.getContext().getStreamCachingStrategy().isEnabled();
+ } else {
+ return exchange.getContext().getRoute(exchange.getFromRouteId()).getRouteContext().isStreamCaching();
+ }
+ }
+
+ /**
* Extracts the body from the given exchange.
* <p/>
* If the exchange pattern is provided it will try to honor it and retrieve the body
http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/camel-core/src/test/java/org/apache/camel/impl/ZipDataFormatTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/ZipDataFormatTest.java b/camel-core/src/test/java/org/apache/camel/impl/ZipDataFormatTest.java
index b2b18ca..b4df4cf 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/ZipDataFormatTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/ZipDataFormatTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.impl;
import java.io.ByteArrayOutputStream;
+import java.util.List;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
@@ -26,6 +27,7 @@ import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.InputStreamCache;
import org.apache.camel.spi.DataFormat;
/**
@@ -117,6 +119,28 @@ public class ZipDataFormatTest extends ContextTestSupport {
result.expectedBodiesReceived(TEXT);
sendText();
result.assertIsSatisfied();
+ List<Exchange> exchangeList = result.getExchanges();
+ assertTrue(exchangeList.get(0).getIn().getBody() instanceof byte[]);
+ }
+
+ public void testStreamCacheUnzip() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ .streamCaching()
+ .marshal().zip()
+ .unmarshal().zip()
+ .to("mock:result");
+ }
+ });
+ context.start();
+
+ MockEndpoint result = context.getEndpoint("mock:result", MockEndpoint.class);
+ result.expectedBodiesReceived(TEXT);
+ sendText();
+ result.assertIsSatisfied();
+ List<Exchange> exchangeList = result.getExchanges();
+ assertTrue(exchangeList.get(0).getIn().getBody() instanceof InputStreamCache);
}
private void sendText() throws Exception {
http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/CryptoDataFormat.java
----------------------------------------------------------------------
diff --git a/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/CryptoDataFormat.java b/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/CryptoDataFormat.java
index e8c5178..c602516 100644
--- a/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/CryptoDataFormat.java
+++ b/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/CryptoDataFormat.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.converter.crypto;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -34,6 +33,7 @@ import static javax.crypto.Cipher.DECRYPT_MODE;
import static javax.crypto.Cipher.ENCRYPT_MODE;
import org.apache.camel.Exchange;
+import org.apache.camel.converter.stream.OutputStreamBuilder;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.DataFormatName;
import org.apache.camel.support.ServiceSupport;
@@ -153,31 +153,30 @@ public class CryptoDataFormat extends ServiceSupport implements DataFormat, Data
}
}
- public Object unmarshal(Exchange exchange, InputStream encryptedStream) throws Exception {
- Object unmarshalled = null;
+ public Object unmarshal(final Exchange exchange, final InputStream encryptedStream) throws Exception {
if (encryptedStream != null) {
byte[] iv = getInlinedInitializationVector(exchange, encryptedStream);
Key key = getKey(exchange);
CipherInputStream cipherStream = null;
- ByteArrayOutputStream plaintextStream = null;
+ OutputStreamBuilder osb = null;
try {
cipherStream = new CipherInputStream(encryptedStream, initializeCipher(DECRYPT_MODE, key, iv));
- plaintextStream = new ByteArrayOutputStream(bufferSize);
+ osb = OutputStreamBuilder.withExchange(exchange);
HMACAccumulator hmac = getMessageAuthenticationCode(key);
byte[] buffer = new byte[bufferSize];
- hmac.attachStream(plaintextStream);
+ hmac.attachStream(osb);
int read;
while ((read = cipherStream.read(buffer)) >= 0) {
hmac.decryptUpdate(buffer, read);
}
hmac.validate();
- unmarshalled = plaintextStream.toByteArray();
+ return osb.build();
} finally {
IOHelper.close(cipherStream, "cipher", LOG);
- IOHelper.close(plaintextStream, "plaintext", LOG);
+ IOHelper.close(osb, "plaintext", LOG);
}
}
- return unmarshalled;
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/HMACAccumulator.java
----------------------------------------------------------------------
diff --git a/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/HMACAccumulator.java b/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/HMACAccumulator.java
index a44b4e3..0092f96 100644
--- a/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/HMACAccumulator.java
+++ b/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/HMACAccumulator.java
@@ -114,7 +114,7 @@ public class HMACAccumulator {
return maclength;
}
- public void attachStream(ByteArrayOutputStream outputStream) {
+ public void attachStream(final OutputStream outputStream) {
this.outputStream = outputStream;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/PGPKeyAccessDataFormat.java
----------------------------------------------------------------------
diff --git a/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/PGPKeyAccessDataFormat.java b/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/PGPKeyAccessDataFormat.java
index a416d0d..0851630 100644
--- a/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/PGPKeyAccessDataFormat.java
+++ b/components/camel-crypto/src/main/java/org/apache/camel/converter/crypto/PGPKeyAccessDataFormat.java
@@ -17,7 +17,6 @@
package org.apache.camel.converter.crypto;
import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -31,7 +30,7 @@ import java.util.Date;
import java.util.List;
import org.apache.camel.Exchange;
-import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.converter.stream.OutputStreamBuilder;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.DataFormatName;
import org.apache.camel.support.ServiceSupport;
@@ -367,10 +366,7 @@ public class PGPKeyAccessDataFormat extends ServiceSupport implements DataFormat
InputStream encData = null;
InputStream uncompressedData = null;
InputStream litData = null;
-
- CachedOutputStream cos;
- ByteArrayOutputStream bos;
- OutputStream os = null;
+ OutputStreamBuilder osb = null;
try {
in = PGPUtil.getDecoderStream(encryptedStream);
@@ -405,37 +401,23 @@ public class PGPKeyAccessDataFormat extends ServiceSupport implements DataFormat
throw getFormatException();
}
litData = ld.getInputStream();
-
- // enable streaming via OutputStreamCache
- if (exchange.getContext().getStreamCachingStrategy().isEnabled()) {
- cos = new CachedOutputStream(exchange);
- bos = null;
- os = cos;
- } else {
- cos = null;
- bos = new ByteArrayOutputStream();
- os = bos;
- }
+ osb = OutputStreamBuilder.withExchange(exchange);
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead;
while ((bytesRead = litData.read(buffer)) != -1) {
- os.write(buffer, 0, bytesRead);
+ osb.write(buffer, 0, bytesRead);
if (signature != null) {
signature.update(buffer, 0, bytesRead);
}
- os.flush();
+ osb.flush();
}
verifySignature(pgpFactory, signature);
} finally {
- IOHelper.close(os, litData, uncompressedData, encData, in, encryptedStream);
+ IOHelper.close(osb, litData, uncompressedData, encData, in, encryptedStream);
}
- if (cos != null) {
- return cos.newStreamCache();
- } else {
- return bos.toByteArray();
- }
+ return osb.build();
}
private InputStream getDecryptedData(Exchange exchange, InputStream encryptedStream) throws Exception, PGPException {
@@ -741,8 +723,8 @@ public class PGPKeyAccessDataFormat extends ServiceSupport implements DataFormat
*
* @param signatureVerificationOption
* signature verification option
- * @throws IllegalArgument
- * exception if an invalid value is entered
+ * @throws IllegalArgumentException
+ * if an invalid value is entered
*/
public void setSignatureVerificationOption(String signatureVerificationOption) {
if (SIGNATURE_VERIFICATION_OPTIONS.contains(signatureVerificationOption)) {
http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java
----------------------------------------------------------------------
diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java
index e5cffac..b251201 100644
--- a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java
+++ b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java
@@ -16,15 +16,15 @@
*/
package org.apache.camel.dataformat.zipfile;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Paths;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import org.apache.camel.Exchange;
+import org.apache.camel.converter.stream.OutputStreamBuilder;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.DataFormatName;
import org.apache.camel.support.ServiceSupport;
@@ -46,13 +46,13 @@ public class ZipFileDataFormat extends ServiceSupport implements DataFormat, Dat
}
@Override
- public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
+ public void marshal(final Exchange exchange, final Object graph, final OutputStream stream) throws Exception {
String filename = exchange.getIn().getHeader(FILE_NAME, String.class);
- if (filename != null) {
- filename = new File(filename).getName(); // remove any path elements
- } else {
+ if (filename == null) {
// generate the file name as the camel file component would do
filename = StringHelper.sanitize(exchange.getIn().getMessageId());
+ } else {
+ filename = Paths.get(filename).getFileName().toString(); // remove any path elements
}
ZipOutputStream zos = new ZipOutputStream(stream);
@@ -71,20 +71,18 @@ public class ZipFileDataFormat extends ServiceSupport implements DataFormat, Dat
}
@Override
- public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
+ public Object unmarshal(final Exchange exchange, final InputStream inputStream) throws Exception {
if (usingIterator) {
return new ZipIterator(exchange.getIn());
} else {
- InputStream is = exchange.getIn().getMandatoryBody(
- InputStream.class);
- ZipInputStream zis = new ZipInputStream(is);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ZipInputStream zis = new ZipInputStream(inputStream);
+ OutputStreamBuilder osb = OutputStreamBuilder.withExchange(exchange);
try {
ZipEntry entry = zis.getNextEntry();
if (entry != null) {
exchange.getOut().setHeader(FILE_NAME, entry.getName());
- IOHelper.copy(zis, baos);
+ IOHelper.copy(zis, osb);
}
entry = zis.getNextEntry();
@@ -92,10 +90,9 @@ public class ZipFileDataFormat extends ServiceSupport implements DataFormat, Dat
throw new IllegalStateException("Zip file has more than 1 entry.");
}
- return baos.toByteArray();
-
+ return osb.build();
} finally {
- IOHelper.close(zis, baos);
+ IOHelper.close(zis, osb);
}
}
}
@@ -117,4 +114,4 @@ public class ZipFileDataFormat extends ServiceSupport implements DataFormat, Dat
protected void doStop() throws Exception {
// noop
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/53f3afaf/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormatTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormatTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormatTest.java
index 4bcc6b1..7273179 100644
--- a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormatTest.java
+++ b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormatTest.java
@@ -29,9 +29,9 @@ import org.apache.camel.Exchange;
import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.InputStreamCache;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.IOHelper;
-import org.apache.camel.util.ObjectHelper;
import org.junit.Test;
import static org.apache.camel.Exchange.FILE_NAME;
@@ -55,6 +55,21 @@ public class ZipFileDataFormatTest extends CamelTestSupport {
private static final File TEST_DIR = new File("target/zip");
@Test
+ public void testZipAndStreamCaching() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:zipStreamCache");
+ mock.setExpectedMessageCount(1);
+
+ template.sendBody("direct:zipStreamCache", TEXT);
+
+ assertMockEndpointsSatisfied();
+
+ Exchange exchange = mock.getReceivedExchanges().get(0);
+ assertEquals(exchange.getIn().getMessageId() + ".zip", exchange.getIn().getHeader(FILE_NAME));
+ assertIsInstanceOf(InputStreamCache.class, exchange.getIn().getBody());
+ assertArrayEquals(getZippedText(exchange.getIn().getMessageId()), exchange.getIn().getMandatoryBody(byte[].class));
+ }
+
+ @Test
public void testZipWithoutFileName() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:zip");
mock.expectedMessageCount(1);
@@ -65,7 +80,7 @@ public class ZipFileDataFormatTest extends CamelTestSupport {
Exchange exchange = mock.getReceivedExchanges().get(0);
assertEquals(exchange.getIn().getMessageId() + ".zip", exchange.getIn().getHeader(FILE_NAME));
- assertTrue(ObjectHelper.equalByteArray(getZippedText(exchange.getIn().getMessageId()), (byte[])exchange.getIn().getBody()));
+ assertArrayEquals(getZippedText(exchange.getIn().getMessageId()), (byte[])exchange.getIn().getBody());
}
@Test
@@ -122,7 +137,7 @@ public class ZipFileDataFormatTest extends CamelTestSupport {
Exchange exchange = mock.getReceivedExchanges().get(0);
File file = new File(TEST_DIR, exchange.getIn().getMessageId() + ".zip");
assertTrue("The file should exist.", file.exists());
- assertTrue("Get a wrong message content.", ObjectHelper.equalByteArray(getZippedText(exchange.getIn().getMessageId()), getBytes(file)));
+ assertArrayEquals("Get a wrong message content.", getZippedText(exchange.getIn().getMessageId()), getBytes(file));
}
@Test
@@ -144,7 +159,7 @@ public class ZipFileDataFormatTest extends CamelTestSupport {
assertTrue("The exchange is not done in time.", notify.matches(5, TimeUnit.SECONDS));
assertTrue("The file should exist.", file.exists());
- assertTrue("Get a wrong message content.", ObjectHelper.equalByteArray(getZippedText("poem.txt"), getBytes(file)));
+ assertArrayEquals("Get a wrong message content.", getZippedText("poem.txt"), getBytes(file));
}
@Test
@@ -188,6 +203,7 @@ public class ZipFileDataFormatTest extends CamelTestSupport {
from("direct:zipToFile").marshal(zip).to("file:" + TEST_DIR.getPath()).to("mock:zipToFile");
from("direct:dslZip").marshal().zipFile().to("mock:dslZip");
from("direct:dslUnzip").unmarshal().zipFile().to("mock:dslUnzip");
+ from("direct:zipStreamCache").streamCaching().marshal().zipFile().to("mock:zipStreamCache");
}
};
}