You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Roman Vottner (Jira)" <ji...@apache.org> on 2021/09/13 17:20:00 UTC

[jira] [Updated] (CAMEL-16953) Unmarshalling large tar.gz files fails

     [ https://issues.apache.org/jira/browse/CAMEL-16953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Roman Vottner updated CAMEL-16953:
----------------------------------
    Description: 
ZulipChat-Discussion: https://camel.zulipchat.com/#narrow/stream/257298-camel/topic/tar.2Egz.20unmarshalling

In a very simple route setup that just reads any tar.gz archives found in the provided directory and prints the names of the files within that archive to the log, this code fails on processing larger tar.gz archives.
 
{code:title=PreProcessingRoute.java|borderStyle=solid}
from(file("archiveFile"))
 .routeId("pre-processing")
 .process(exchange -> {
 LOG.info("Processing archive: {}", exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
 })
 .unmarshal().gzipDeflater()
 .split(new TarSplitter()).streaming()
 .process(exchange -> {
 final String name = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class); 
 LOG.debug("name: {}", name);
 })
 .end();
{code}

The JVM quickly runs out of memory as it probably copies over the bytes from the original stream to a stream that should take care of decompressing the files, as indicated by the exception being thrown in the `hugeCapacity(...)` method of the `ByteArrayOutputStream`class:
 
{code:title=StackTrace|borderStyle=solid}
org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[]
 at org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45)
 at org.apache.camel.support.AbstractExchange.setException(AbstractExchange.java:589)
 at org.apache.camel.support.DefaultExchange.setException(DefaultExchange.java:27)
 at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:81)
 at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
 at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
 at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
 at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
 at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
 at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492)
 at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245)
 at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206)
 at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
 at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: null
 at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
 at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
 at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
 at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
 at org.apache.camel.support.builder.OutputStreamBuilder.write(OutputStreamBuilder.java:58)
 at org.apache.camel.util.IOHelper.copy(IOHelper.java:193)
 at org.apache.camel.util.IOHelper.copy(IOHelper.java:148)
 at org.apache.camel.util.IOHelper.copy(IOHelper.java:143)
 at org.apache.camel.util.IOHelper.copy(IOHelper.java:139)
 at org.apache.camel.dataformat.deflater.GzipDeflaterDataFormat.unmarshal(GzipDeflaterDataFormat.java:63)
 at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:64)
 ... 18 common frames omitted
{code}

If I instead use a custom DataFormat class that looks like this:

{code:title=PreProcessingRoute.java|borderStyle=solid}
import org.apache.camel.Exchange;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.util.IOHelper;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;

import java.io.InputStream;
import java.io.OutputStream;

public class GZipDataFormat implements DataFormat {

@Override
 public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
 InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, graph);

GzipCompressorOutputStream zipOutput = new GzipCompressorOutputStream(stream);
 try {
 IOHelper.copy(is, zipOutput);
 } finally {
 // must close all input streams
 IOHelper.close(is, zipOutput);
 }
 }

@Override
 public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
 return new GzipCompressorInputStream(exchange.getIn().getMandatoryBody(InputStream.class));
 }

@Override
 public void start() {

}

@Override
 public void stop() {

}
}
{code}

and change the `unmarshal().gzipDeflater()` to `unmarshal(new GZipDataFormat())` implementation, Camel is able to decompress the bytes correctly and pass the stream on so that the `TarSplitter` can iterate over the entries of that archive.

  was:
ZulipChat-Discussion: https://camel.zulipchat.com/#narrow/stream/257298-camel/topic/tar.2Egz.20unmarshalling

In a very simple route setup that just reads any tar.gz archives found in the provided directory and prints the names of the files within that archive to the log, this code fails on processing larger tar.gz archives.

 

from(file({color:#6a8759}"\{{archiveFile}}"{color}))
           .routeId({color:#6a8759}"pre-processing"{color})
           .process(exchange -> {
             {color:#9876aa}LOG{color}.info({color:#6a8759}"Processing archive: {}"{color}{color:#cc7832}, {color}exchange.getIn().getHeader(Exchange.{color:#9876aa}FILE_NAME{color}{color:#cc7832}, {color}String.{color:#cc7832}class{color})){color:#cc7832};
{color}           }){color:#808080}
{color}          .unmarshal().gzipDeflater()
          .split({color:#cc7832}new {color}TarSplitter()).streaming()
              .process(exchange -> {
                  {color:#cc7832}final {color}String name = exchange.getIn().getHeader(Exchange.{color:#9876aa}FILE_NAME{color}{color:#cc7832}, {color}String.{color:#cc7832}class{color}){color:#cc7832};
{color}                  {color:#9876aa}LOG{color}.debug({color:#6a8759}"name: {}"{color}{color:#cc7832}, {color}name){color:#cc7832};
{color}              }){color:#808080}
{color}          .end(){color:#cc7832};{color}

The JVM quickly runs out of memory as it probably copies over the bytes from the original stream to a stream that should take care of decompressing the files, as indicated by the exception being thrown in the `hugeCapacity(...)` method of the `ByteArrayOutputStream`class:

 

{{org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[]    at org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45)    at org.apache.camel.support.AbstractExchange.setException(AbstractExchange.java:589)    at org.apache.camel.support.DefaultExchange.setException(DefaultExchange.java:27)    at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:81)    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)    at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492)    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245)    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206)    at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)    at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)    at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)    at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.OutOfMemoryError: null    at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)    at org.apache.camel.support.builder.OutputStreamBuilder.write(OutputStreamBuilder.java:58)    at org.apache.camel.util.IOHelper.copy(IOHelper.java:193)    at org.apache.camel.util.IOHelper.copy(IOHelper.java:148)    at org.apache.camel.util.IOHelper.copy(IOHelper.java:143)    at org.apache.camel.util.IOHelper.copy(IOHelper.java:139)    at org.apache.camel.dataformat.deflater.GzipDeflaterDataFormat.unmarshal(GzipDeflaterDataFormat.java:63)    at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:64)    ... 18 common frames omitted}}

 

If I instead use a custom DataFormat class that looks like this:

{{}}{{}}

{color:#cc7832}import {color}org.apache.camel.Exchange{color:#cc7832};
{color}{color:#cc7832}import {color}org.apache.camel.spi.DataFormat{color:#cc7832};
{color}{color:#cc7832}import {color}org.apache.camel.util.IOHelper{color:#cc7832};
{color}{color:#cc7832}import {color}org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream{color:#cc7832};
{color}{color:#cc7832}import {color}org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream{color:#cc7832};
{color}{color:#cc7832}
{color}{color:#cc7832}import {color}java.io.InputStream{color:#cc7832};
{color}{color:#cc7832}import {color}java.io.OutputStream{color:#cc7832};
{color}{color:#cc7832}
{color}{color:#cc7832}public class {color}GZipDataFormat {color:#cc7832}implements {color}DataFormat {

  {color:#bbb529}@Override
{color}  {color:#cc7832}public void {color}{color:#ffc66d}marshal{color}(Exchange exchange{color:#cc7832}, {color}Object graph{color:#cc7832}, {color}OutputStream stream) {color:#cc7832}throws {color}Exception {
   InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.{color:#cc7832}class, {color}exchange{color:#cc7832}, {color}graph){color:#cc7832};
{color}{color:#cc7832}
{color}  GzipCompressorOutputStream zipOutput = {color:#cc7832}new {color}GzipCompressorOutputStream(stream){color:#cc7832};
{color}{color:#cc7832}   try {color}{
      IOHelper.copy(is{color:#cc7832}, {color}zipOutput){color:#cc7832};
{color}    } {color:#cc7832}finally {color}{
     {color:#808080}// must close all input streams
{color}     IOHelper.close(is{color:#cc7832}, {color}zipOutput){color:#cc7832};
{color}    }
  }

  {color:#bbb529}@Override
{color}  {color:#cc7832}public {color}Object {color:#ffc66d}unmarshal{color}(Exchange exchange{color:#cc7832}, {color}InputStream stream) {color:#cc7832}throws {color}Exception {
    {color:#cc7832}return new {color}GzipCompressorInputStream(exchange.getIn().getMandatoryBody(InputStream.{color:#cc7832}class{color})){color:#cc7832};
{color}  }

  {color:#bbb529}@Override
{color}  {color:#cc7832}public void {color}{color:#ffc66d}start{color}() {

  }

  {color:#bbb529}@Override
{color}  {color:#cc7832}public void {color}{color:#ffc66d}stop{color}() {

  }
}

 

and change the `unmarshal().gzipDeflater()` to `unmarshal(new GZipDataFormat())` implementation, Camel is able to decompress the bytes correctly and pass the stream on so that the `TarSplitter` can iterate over the entries of that archive.


> Unmarshalling large tar.gz files fails
> --------------------------------------
>
>                 Key: CAMEL-16953
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16953
>             Project: Camel
>          Issue Type: Bug
>          Components: came-core
>    Affects Versions: 3.x
>         Environment: Ubuntu 19.04
> openjdk 11.0.11 2021-04-20
> OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
> OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)
> Camel 3.11.1
>            Reporter: Roman Vottner
>            Priority: Major
>
> ZulipChat-Discussion: https://camel.zulipchat.com/#narrow/stream/257298-camel/topic/tar.2Egz.20unmarshalling
> In a very simple route setup that just reads any tar.gz archives found in the provided directory and prints the names of the files within that archive to the log, this code fails on processing larger tar.gz archives.
>  
> {code:title=PreProcessingRoute.java|borderStyle=solid}
> from(file("archiveFile"))
>  .routeId("pre-processing")
>  .process(exchange -> {
>  LOG.info("Processing archive: {}", exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
>  })
>  .unmarshal().gzipDeflater()
>  .split(new TarSplitter()).streaming()
>  .process(exchange -> {
>  final String name = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class); 
>  LOG.debug("name: {}", name);
>  })
>  .end();
> {code}
> The JVM quickly runs out of memory as it probably copies over the bytes from the original stream to a stream that should take care of decompressing the files, as indicated by the exception being thrown in the `hugeCapacity(...)` method of the `ByteArrayOutputStream`class:
>  
> {code:title=StackTrace|borderStyle=solid}
> org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[]
>  at org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45)
>  at org.apache.camel.support.AbstractExchange.setException(AbstractExchange.java:589)
>  at org.apache.camel.support.DefaultExchange.setException(DefaultExchange.java:27)
>  at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:81)
>  at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
>  at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
>  at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
>  at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
>  at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
>  at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492)
>  at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245)
>  at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206)
>  at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
>  at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError: null
>  at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>  at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>  at org.apache.camel.support.builder.OutputStreamBuilder.write(OutputStreamBuilder.java:58)
>  at org.apache.camel.util.IOHelper.copy(IOHelper.java:193)
>  at org.apache.camel.util.IOHelper.copy(IOHelper.java:148)
>  at org.apache.camel.util.IOHelper.copy(IOHelper.java:143)
>  at org.apache.camel.util.IOHelper.copy(IOHelper.java:139)
>  at org.apache.camel.dataformat.deflater.GzipDeflaterDataFormat.unmarshal(GzipDeflaterDataFormat.java:63)
>  at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:64)
>  ... 18 common frames omitted
> {code}
> If I instead use a custom DataFormat class that looks like this:
> {code:title=PreProcessingRoute.java|borderStyle=solid}
> import org.apache.camel.Exchange;
> import org.apache.camel.spi.DataFormat;
> import org.apache.camel.util.IOHelper;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import java.io.InputStream;
> import java.io.OutputStream;
> public class GZipDataFormat implements DataFormat {
> @Override
>  public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
>  InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, graph);
> GzipCompressorOutputStream zipOutput = new GzipCompressorOutputStream(stream);
>  try {
>  IOHelper.copy(is, zipOutput);
>  } finally {
>  // must close all input streams
>  IOHelper.close(is, zipOutput);
>  }
>  }
> @Override
>  public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
>  return new GzipCompressorInputStream(exchange.getIn().getMandatoryBody(InputStream.class));
>  }
> @Override
>  public void start() {
> }
> @Override
>  public void stop() {
> }
> }
> {code}
> and change the `unmarshal().gzipDeflater()` to `unmarshal(new GZipDataFormat())` implementation, Camel is able to decompress the bytes correctly and pass the stream on so that the `TarSplitter` can iterate over the entries of that archive.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)