You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/07 10:56:13 UTC
[camel] branch master updated: CAMEL-13774: camel-zipfile now
supports marshal an iterator to zipfile
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 3417c9e CAMEL-13774: camel-zipfile now supports marshal an iterator to zipfile
3417c9e is described below
commit 3417c9ef5fa45f86dfa386c8b38a9ce4bf0bdd30
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Aug 7 12:54:59 2019 +0200
CAMEL-13774: camel-zipfile now supports marshal an iterator to zipfile
---
.../dataformat/zipfile/ZipFileDataFormat.java | 20 +++++-
.../zipfile/ZipFileIteratorDataFormatTest.java | 67 +++++++++++++++++++
.../camel/support/InputStreamIteratorTest.java | 51 ++++++++++++++
.../apache/camel/support/InputStreamIterator.java | 78 ++++++++++++++++++++++
4 files changed, 215 insertions(+), 1 deletion(-)
diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java
index fe889f5..5fd9f4e 100644
--- a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java
+++ b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipFileDataFormat.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.dataformat.zipfile;
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -27,9 +28,12 @@ import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.TypeConverter;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.DataFormatName;
import org.apache.camel.spi.annotations.Dataformat;
+import org.apache.camel.support.InputStreamIterator;
import org.apache.camel.support.builder.OutputStreamBuilder;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.IOHelper;
@@ -71,7 +75,21 @@ public class ZipFileDataFormat extends ServiceSupport implements DataFormat, Dat
createZipEntries(zos, filename);
}
- InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, graph);
+ Object body = exchange.getIn().getBody();
+ TypeConverter converter = exchange.getContext().getTypeConverter();
+ // favour using input stream
+ InputStream is = converter.tryConvertTo(InputStream.class, exchange, body);
+ if (is == null) {
+ // okay so try to see if its an iterator which we can wrap as input stream
+ Iterator it = converter.tryConvertTo(Iterator.class, exchange, body);
+ if (it != null) {
+ is = new InputStreamIterator(converter, it);
+ is = new BufferedInputStream(is);
+ }
+ }
+ if (is == null) {
+ throw new InvalidPayloadException(exchange, body.getClass());
+ }
try {
IOHelper.copy(is, zos);
diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileIteratorDataFormatTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileIteratorDataFormatTest.java
new file mode 100644
index 0000000..f576429
--- /dev/null
+++ b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileIteratorDataFormatTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dataformat.zipfile;
+
+import java.io.File;
+import java.util.stream.Stream;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class ZipFileIteratorDataFormatTest extends CamelTestSupport {
+
+ @Test
+ public void testZipFileIterator() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ Stream body = Stream.of("ABC", "DEF", "1234567890");
+ template.sendBody("direct:zip", body);
+
+ assertMockEndpointsSatisfied();
+
+ resetMocks();
+
+ // unzip the file
+ getMockEndpoint("mock:unzip").expectedBodiesReceived("ABCDEF1234567890");
+
+ File file = new File("target/output/report.txt.zip");
+ template.sendBody("direct:unzip", file);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:zip")
+ .setHeader(Exchange.FILE_NAME, constant("report.txt"))
+ .marshal().zipFile()
+ .to("file:target/output")
+ .to("mock:result");
+
+ from("direct:unzip")
+ .unmarshal().zipFile()
+ .to("mock:unzip");
+ }
+ };
+ }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/support/InputStreamIteratorTest.java b/core/camel-core/src/test/java/org/apache/camel/support/InputStreamIteratorTest.java
new file mode 100644
index 0000000..4b094f0
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/support/InputStreamIteratorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Iterator;
+import java.util.stream.Stream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.util.IOHelper;
+import org.junit.Test;
+
+public class InputStreamIteratorTest extends ContextTestSupport {
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ @Test
+ public void testInputStreamIterator() throws Exception {
+ context.start();
+
+ Iterator it = Stream.of("ABC", "DEF", "1234567890").iterator();
+ InputStreamIterator is = new InputStreamIterator(context.getTypeConverter(), it);
+ // from the first chunk
+ assertEquals(3, is.available());
+
+ // copy input to output
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ IOHelper.copy(is, bos);
+ IOHelper.close(is, bos);
+
+ // and we have the data back
+ assertEquals("ABCDEF1234567890", bos.toString());
+ }
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/InputStreamIterator.java b/core/camel-support/src/main/java/org/apache/camel/support/InputStreamIterator.java
new file mode 100644
index 0000000..5c80b2e
--- /dev/null
+++ b/core/camel-support/src/main/java/org/apache/camel/support/InputStreamIterator.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.TypeConverter;
+
+/**
+ * An {@link InputStream} that wraps an {@link Iterator} which reads iterations as byte array data.
+ */
+public final class InputStreamIterator extends InputStream {
+ private final TypeConverter converter;
+ private final Iterator it;
+ private InputStream chunk;
+
+ public InputStreamIterator(TypeConverter converter, Iterator it) {
+ this.converter = converter;
+ this.it = it;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (chunk == null) {
+ chunk = nextChunk();
+ }
+ if (chunk == null) {
+ return -1;
+ }
+ int data = chunk.read();
+ if (data == -1) {
+ // initialize for next chunk
+ chunk = null;
+ return read();
+ }
+
+ return data;
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (chunk == null) {
+ chunk = nextChunk();
+ }
+ return chunk != null ? chunk.available() : 0;
+ }
+
+ private InputStream nextChunk() throws IOException {
+ if (it.hasNext()) {
+ try {
+ byte[] buf = converter.mandatoryConvertTo(byte[].class, it.next());
+ return new ByteArrayInputStream(buf);
+ } catch (NoTypeConversionAvailableException e) {
+ throw new IOException(e);
+ }
+ } else {
+ return null;
+ }
+ }
+}