You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Etienne Chauchot <ec...@apache.org> on 2021/10/15 11:02:47 UTC

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

Hi Sandeep and community,

I'm a PMC member of the Beam community and also a contributor on Flink. 
I also used parquet in conjunction with Beam pipelines running with 
Flink Beam runner. So I should be able to help:

I don't think it is a Flink issue as you have a Beam pipeline so you use 
only Beam IOs that are wrapped by Beam runner inside a Flink operator at 
(Beam) translation time.

I don't think It is related to S3 either but much more to ParquetIO in 
Beam (read and write connector).

I'll check at the Beam side and get back to you.

Best

Etienne.

On 30/09/2021 14:42, Till Rohrmann wrote:
> Hi Sandeep,
>
> I am not a Beam expert. The problem might be caused by the used S3
> filesystem implementation. Have you tried whether the same problem occurs
> when using vanilla Flink's latest version? Alternatively, you could also
> reach out to the Beam community or ask on Flink's user ML whether people
> have experience with such a problem.
>
> Some of the exceptions look as if your network is a bit flakey. You might
> wanna look into the infrastructure you are running on.
>
> Cheers,
> Till
>
> On Tue, Sep 14, 2021 at 5:22 PM Kathula, Sandeep
> <Sa...@intuit.com.invalid> wrote:
>
>> Hi,
>>     We have a simple Beam application which reads from Kafka, converts to
>> parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
>> have a fixed window of 5 minutes after conversion to
>> PCollection<GenericRecord> and then writing to S3. We have around 320
>> columns in our data. Our intention is to write large files of size 128MB or
>> more so that it won’t have a small file problem when reading back from
>> Hive. But from what we observed it is taking too much memory to write to S3
>> (giving memory of 8GB to heap is not enough to write 50 MB files and it is
>> going OOM). When I increase memory for heap to 32GB then it take lot of
>> time to write records to s3.
>> For instance it takes:
>>
>> 20 MB file - 30 sec
>> 50 MB file - 1 min 16 sec
>> 75 MB file - 2 min 15 sec
>> 83 MB file - 2 min 40 sec
>>
>> Code block to write to S3:
>> PCollection<GenericRecord> parquetRecord = ………………………….
>>
>> parquetRecord.apply(FileIO.<GenericRecord>write()
>>                  .via(ParquetIO.sink(getOutput_schema()))
>>                  .to(outputPath.isEmpty() ? outputPath() : outputPath)
>>                  .withNumShards(5)
>>                  .withNaming(new CustomFileNaming("snappy.parquet")));
>>
>>
>> We are also getting different exceptions like:
>>
>>
>>    1.  UserCodeException:
>>
>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>>              at
>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>              at
>> com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>              at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>              at
>> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)
>>              at java.lang.Iterable.forEach(Iterable.java:75)
>>              at
>> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)
>>              at
>> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>              at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>              at
>> com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson.processElement(ExtractRequiredJson.java:67)
>>              at
>> com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>              at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>              at
>> com.intuit.data.platform.process.thrivev2.filter.Filter.filterElement(Filter.java:49)
>>              at
>> com.intuit.data.platform.process.thrivev2.filter.Filter$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>              at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>              at
>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
>>              at
>> org.apache.beam.sdk.transforms.Filter$1.processElement(Filter.java:211)
>>              at
>> org.apache.beam.sdk.transforms.Filter$1$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>              at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>              at
>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
>>              at
>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>>              at
>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>              at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>              at
>> com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn.extract(EbExtractorDoFn.java:85)
>>              at
>> com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>
>>
>>
>>
>>
>>
>> 2.            Connection timed out:
>>
>> ERROR o.a.f.s.c.o.a.c.ConnectionState - Connection timed out for
>> connection string (
>> internal-a49124072c9ca4429b037070c497dc28-234959464.us-west-2.elb.amazonaws.com:12181)
>> and timeout (15000) / elapsed (58732)
>> org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException:
>> KeeperErrorCode = ConnectionLoss
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835)
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
>>              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>              at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>              at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>              at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>              at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>              at java.lang.Thread.run(Thread.java:748)
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 3.            com.amazonaws.AbortedException
>>
>>              Caused by: java.io.IOException: com.amazonaws.AbortedException:
>>              at
>> org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.flush(S3WritableByteChannel.java:153)
>>              at
>> org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.write(S3WritableByteChannel.java:127)
>>              at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
>>              at java.nio.channels.Channels.writeFully(Channels.java:101)
>>              at java.nio.channels.Channels.access$000(Channels.java:61)
>>              at java.nio.channels.Channels$1.write(Channels.java:174)
>>              at org.apache.beam.sdk.io
>> .parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:452)
>>              at org.apache.beam.sdk.io
>> .parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:447)
>>              at
>> org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
>>              at
>> org.apache.parquet.hadoop.ParquetFileWriter.writeDataPages(ParquetFileWriter.java:460)
>>              at
>> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:201)
>>              at
>> org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:261)
>>              at
>> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173)
>>              at
>> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
>>              at
>> org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)
>>              at org.apache.beam.sdk.io
>> .parquet.ParquetIO$Sink.flush(ParquetIO.java:394)
>>              at
>> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.finishWrite(FileIO.java:1400)
>>              at org.apache.beam.sdk.io
>> .FileBasedSink$Writer.close(FileBasedSink.java:1006)
>>              at org.apache.beam.sdk.io
>> .WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:780)
>>
>>
>>
>>
>>
>>
>>
>>
>> 4.            Connection unexpectedly closed by remote task manager:
>>   WARN  o.a.flink.runtime.taskmanager.Task -
>> FileIO.Write/WriteFiles/GatherTempFileResults/Drop
>> key/Values/Map/ParMultiDo(Anonymous) ->
>> FileIO.Write/WriteFiles/GatherTempFileResults/Gather
>> bundles/ParMultiDo(GatherBundlesPerWindow) ->
>> FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Pair
>> with random key/ParMultiDo(AssignShard) (5/5)#0
>> (b1f59ef12b569d904c28de21a4087655) switched from RUNNING to FAILED.
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager '
>> 10.35.134.92/10.35.134.92:33413'. This might indicate that the remote
>> task manager was lost.
>>
>>
>>
>>
>>
>>
>> 5.            Checkpoints are failing with IOExceptions: After a few
>> restarts checkpoints start failing with IOExceptions.
>>
>> Caused by: java.io.IOException: Interrupted while waiting for buffer
>>              at org.apache.flink.runtime.io
>> .network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341)
>>              at org.apache.flink.runtime.io
>> .network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
>>              at org.apache.flink.runtime.io
>> .network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
>>              at org.apache.flink.runtime.io
>> .network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
>>              at org.apache.flink.runtime.io
>> .network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>>              at org.apache.flink.runtime.io
>> .network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
>>              at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
>>              ... 176 common frames omitted
>>
>>
>>
>>
>>
>>         Just wanted to know if anyone has experienced these kind of issues
>> and how we can solve these.
>>
>>
>>
>>         Thanks,
>>          Sandeep
>>
>>
>>
>>
>>

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

Posted by Etienne Chauchot <ec...@apache.org>.
Hi Sandeep, hi all,

I'm getting back to you on this question after a check on Beam:

ParquetIO has a block group size support (see for ex 
https://www.dremio.com/tuning-parquet/).

Default group size is 1024*1024 bytes. Have you tried playing with 
different values of it using Sink#withRowGroupSize(), in particular 
matching your S3 filesystem block size?

Also you should put your Beam logs in debug level as the IO has 
"allocated memory" and "too much memory used" debug and warning log 
messages.

Best

Etienne Chauchot

On 15/10/2021 13:02, Etienne Chauchot wrote:
> Hi Sandeep and community,
>
> I'm a PMC member of the Beam community and also a contributor on 
> Flink. I also used parquet in conjunction with Beam pipelines running 
> with Flink Beam runner. So I should be able to help:
>
> I don't think it is a Flink issue as you have a Beam pipeline so you 
> use only Beam IOs that are wrapped by Beam runner inside a Flink 
> operator at (Beam) translation time.
>
> I don't think It is related to S3 either but much more to ParquetIO in 
> Beam (read and write connector).
>
> I'll check at the Beam side and get back to you.
>
> Best
>
> Etienne.
>
> On 30/09/2021 14:42, Till Rohrmann wrote:
>> Hi Sandeep,
>>
>> I am not a Beam expert. The problem might be caused by the used S3
>> filesystem implementation. Have you tried whether the same problem 
>> occurs
>> when using vanilla Flink's latest version? Alternatively, you could also
>> reach out to the Beam community or ask on Flink's user ML whether people
>> have experience with such a problem.
>>
>> Some of the exceptions look as if your network is a bit flakey. You 
>> might
>> wanna look into the infrastructure you are running on.
>>
>> Cheers,
>> Till
>>
>> On Tue, Sep 14, 2021 at 5:22 PM Kathula, Sandeep
>> <Sa...@intuit.com.invalid> wrote:
>>
>>> Hi,
>>>     We have a simple Beam application which reads from Kafka, 
>>> converts to
>>> parquet and write to S3 with Flink runner (Beam 2.29 and Flink 
>>> 1.12). We
>>> have a fixed window of 5 minutes after conversion to
>>> PCollection<GenericRecord> and then writing to S3. We have around 320
>>> columns in our data. Our intention is to write large files of size 
>>> 128MB or
>>> more so that it won’t have a small file problem when reading back from
>>> Hive. But from what we observed it is taking too much memory to 
>>> write to S3
>>> (giving memory of 8GB to heap is not enough to write 50 MB files and 
>>> it is
>>> going OOM). When I increase memory for heap to 32GB then it take lot of
>>> time to write records to s3.
>>> For instance it takes:
>>>
>>> 20 MB file - 30 sec
>>> 50 MB file - 1 min 16 sec
>>> 75 MB file - 2 min 15 sec
>>> 83 MB file - 2 min 40 sec
>>>
>>> Code block to write to S3:
>>> PCollection<GenericRecord> parquetRecord = ………………………….
>>>
>>> parquetRecord.apply(FileIO.<GenericRecord>write()
>>>                  .via(ParquetIO.sink(getOutput_schema()))
>>>                  .to(outputPath.isEmpty() ? outputPath() : outputPath)
>>>                  .withNumShards(5)
>>>                  .withNaming(new CustomFileNaming("snappy.parquet")));
>>>
>>>
>>> We are also getting different exceptions like:
>>>
>>>
>>>    1.  UserCodeException:
>>>
>>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
>>>
>>> Could not forward element to next operator
>>>              at
>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36) 
>>>
>>>              at java.lang.Iterable.forEach(Iterable.java:75)
>>>              at
>>> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson.processElement(ExtractRequiredJson.java:67) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.filter.Filter.filterElement(Filter.java:49) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.filter.Filter$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 
>>>
>>>              at
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73) 
>>>
>>>              at
>>> org.apache.beam.sdk.transforms.Filter$1.processElement(Filter.java:211)
>>>              at
>>> org.apache.beam.sdk.transforms.Filter$1$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 
>>>
>>>              at
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73) 
>>>
>>>              at
>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139) 
>>>
>>>              at
>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) 
>>>
>>>              at
>>> com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn.extract(EbExtractorDoFn.java:85) 
>>>
>>>              at
>>> com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 2.            Connection timed out:
>>>
>>> ERROR o.a.f.s.c.o.a.c.ConnectionState - Connection timed out for
>>> connection string (
>>> internal-a49124072c9ca4429b037070c497dc28-234959464.us-west-2.elb.amazonaws.com:12181) 
>>>
>>> and timeout (15000) / elapsed (58732)
>>> org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException: 
>>>
>>> KeeperErrorCode = ConnectionLoss
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225) 
>>>
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94) 
>>>
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117) 
>>>
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835) 
>>>
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) 
>>>
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) 
>>>
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) 
>>>
>>>              at 
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>              at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
>>>
>>>              at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
>>>
>>>              at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
>>>
>>>              at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
>>>
>>>              at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 3.            com.amazonaws.AbortedException
>>>
>>>              Caused by: java.io.IOException: 
>>> com.amazonaws.AbortedException:
>>>              at
>>> org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.flush(S3WritableByteChannel.java:153) 
>>>
>>>              at
>>> org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.write(S3WritableByteChannel.java:127) 
>>>
>>>              at 
>>> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
>>>              at 
>>> java.nio.channels.Channels.writeFully(Channels.java:101)
>>>              at java.nio.channels.Channels.access$000(Channels.java:61)
>>>              at java.nio.channels.Channels$1.write(Channels.java:174)
>>>              at org.apache.beam.sdk.io
>>> .parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:452)
>>>              at org.apache.beam.sdk.io
>>> .parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:447)
>>>              at
>>> org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46) 
>>>
>>>              at
>>> org.apache.parquet.hadoop.ParquetFileWriter.writeDataPages(ParquetFileWriter.java:460) 
>>>
>>>              at
>>> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:201) 
>>>
>>>              at
>>> org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:261) 
>>>
>>>              at
>>> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173) 
>>>
>>>              at
>>> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114) 
>>>
>>>              at
>>> org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)
>>>              at org.apache.beam.sdk.io
>>> .parquet.ParquetIO$Sink.flush(ParquetIO.java:394)
>>>              at
>>> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.finishWrite(FileIO.java:1400) 
>>>
>>>              at org.apache.beam.sdk.io
>>> .FileBasedSink$Writer.close(FileBasedSink.java:1006)
>>>              at org.apache.beam.sdk.io
>>> .WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:780) 
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 4.            Connection unexpectedly closed by remote task manager:
>>>   WARN  o.a.flink.runtime.taskmanager.Task -
>>> FileIO.Write/WriteFiles/GatherTempFileResults/Drop
>>> key/Values/Map/ParMultiDo(Anonymous) ->
>>> FileIO.Write/WriteFiles/GatherTempFileResults/Gather
>>> bundles/ParMultiDo(GatherBundlesPerWindow) ->
>>> FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Pair 
>>>
>>> with random key/ParMultiDo(AssignShard) (5/5)#0
>>> (b1f59ef12b569d904c28de21a4087655) switched from RUNNING to FAILED.
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
>>>
>>> Connection unexpectedly closed by remote task manager '
>>> 10.35.134.92/10.35.134.92:33413'. This might indicate that the remote
>>> task manager was lost.
>>>
>>>
>>>
>>>
>>>
>>>
>>> 5.            Checkpoints are failing with IOExceptions: After a few
>>> restarts checkpoints start failing with IOExceptions.
>>>
>>> Caused by: java.io.IOException: Interrupted while waiting for buffer
>>>              at org.apache.flink.runtime.io
>>> .network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341) 
>>>
>>>              at org.apache.flink.runtime.io
>>> .network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313) 
>>>
>>>              at org.apache.flink.runtime.io
>>> .network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257) 
>>>
>>>              at org.apache.flink.runtime.io
>>> .network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149) 
>>>
>>>              at org.apache.flink.runtime.io
>>> .network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>>>              at org.apache.flink.runtime.io
>>> .network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101) 
>>>
>>>              ... 176 common frames omitted
>>>
>>>
>>>
>>>
>>>
>>>         Just wanted to know if anyone has experienced these kind of 
>>> issues
>>> and how we can solve these.
>>>
>>>
>>>
>>>         Thanks,
>>>          Sandeep
>>>
>>>
>>>
>>>
>>>