You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (Jira)" <ji...@apache.org> on 2021/09/24 08:28:00 UTC

[jira] [Assigned] (CAMEL-16953) camel-zip-deflater - Use Commons Compress to be able to un-zip large payloads

     [ https://issues.apache.org/jira/browse/CAMEL-16953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen reassigned CAMEL-16953:
-----------------------------------

    Assignee: Claus Ibsen

> camel-zip-deflater - Use Commons Compress to be able to un-zip large payloads
> -----------------------------------------------------------------------------
>
>                 Key: CAMEL-16953
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16953
>             Project: Camel
>          Issue Type: Improvement
>    Affects Versions: 3.11.0
>         Environment: Ubuntu 19.04
> openjdk 11.0.11 2021-04-20
> OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
> OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)
> Camel 3.11.1
>            Reporter: Roman Vottner
>            Assignee: Claus Ibsen
>            Priority: Major
>             Fix For: 3.12.0
>
>
> ZulipChat-Discussion: https://camel.zulipchat.com/#narrow/stream/257298-camel/topic/tar.2Egz.20unmarshalling
> In a very simple route setup that just reads any tar.gz archives found in the provided directory and prints the names of the files within that archive to the log, this code fails on processing larger tar.gz archives.
>  
> {code:title=PreProcessingRoute.java|borderStyle=solid}
> from(file("archiveFile"))
>  .routeId("pre-processing")
>  .process(exchange -> {
>  LOG.info("Processing archive: {}", exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
>  })
>  .unmarshal().gzipDeflater()
>  .split(new TarSplitter()).streaming()
>  .process(exchange -> {
>  final String name = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class); 
>  LOG.debug("name: {}", name);
>  })
>  .end();
> {code}
> The JVM quickly runs out of memory as it probably copies over the bytes from the original stream to a stream that should take care of decompressing the files, as indicated by the exception being thrown in the `hugeCapacity(...)` method of the `ByteArrayOutputStream`class:
>  
> {code:title=StackTrace|borderStyle=solid}
> org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[]
>  at org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45)
>  at org.apache.camel.support.AbstractExchange.setException(AbstractExchange.java:589)
>  at org.apache.camel.support.DefaultExchange.setException(DefaultExchange.java:27)
>  at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:81)
>  at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
>  at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
>  at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
>  at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
>  at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
>  at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492)
>  at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245)
>  at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206)
>  at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
>  at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError: null
>  at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>  at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>  at org.apache.camel.support.builder.OutputStreamBuilder.write(OutputStreamBuilder.java:58)
>  at org.apache.camel.util.IOHelper.copy(IOHelper.java:193)
>  at org.apache.camel.util.IOHelper.copy(IOHelper.java:148)
>  at org.apache.camel.util.IOHelper.copy(IOHelper.java:143)
>  at org.apache.camel.util.IOHelper.copy(IOHelper.java:139)
>  at org.apache.camel.dataformat.deflater.GzipDeflaterDataFormat.unmarshal(GzipDeflaterDataFormat.java:63)
>  at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:64)
>  ... 18 common frames omitted
> {code}
> If I instead use a custom DataFormat class that looks like this:
> {code:title=PreProcessingRoute.java|borderStyle=solid}
> import org.apache.camel.Exchange;
> import org.apache.camel.spi.DataFormat;
> import org.apache.camel.util.IOHelper;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import java.io.InputStream;
> import java.io.OutputStream;
> public class GZipDataFormat implements DataFormat {
> @Override
>  public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
>  InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, graph);
> GzipCompressorOutputStream zipOutput = new GzipCompressorOutputStream(stream);
>  try {
>  IOHelper.copy(is, zipOutput);
>  } finally {
>  // must close all input streams
>  IOHelper.close(is, zipOutput);
>  }
>  }
> @Override
>  public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
>  return new GzipCompressorInputStream(exchange.getIn().getMandatoryBody(InputStream.class));
>  }
> @Override
>  public void start() {
> }
> @Override
>  public void stop() {
> }
> }
> {code}
> and change the `unmarshal().gzipDeflater()` to `unmarshal(new GZipDataFormat())` implementation, Camel is able to decompress the bytes correctly and pass the stream on so that the `TarSplitter` can iterate over the entries of that archive.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)