You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Kathula, Sandeep" <Sa...@intuit.com.INVALID> on 2021/09/14 15:21:32 UTC

Beam with Flink runner - Issues when writing to S3 in Parquet Format

Hi,
   We have a simple Beam application which reads from Kafka, converts to parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We have a fixed window of 5 minutes after conversion to PCollection<GenericRecord> and then writing to S3. We have around 320 columns in our data. Our intention is to write large files of size 128MB or more so that it won’t have a small file problem when reading back from Hive. But from what we observed it is taking too much memory to write to S3 (giving memory of 8GB to heap is not enough to write 50 MB files and it is going OOM). When I increase memory for heap to 32GB then it take lot of time to write records to s3.
For instance it takes:

20 MB file - 30 sec
50 MB file - 1 min 16 sec
75 MB file - 2 min 15 sec
83 MB file - 2 min 40 sec

Code block to write to S3:
PCollection<GenericRecord> parquetRecord = ………………………….

parquetRecord.apply(FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(getOutput_schema()))
                .to(outputPath.isEmpty() ? outputPath() : outputPath)
                .withNumShards(5)
                .withNaming(new CustomFileNaming("snappy.parquet")));


We are also getting different exceptions like:


  1.  UserCodeException:

Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
            at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
            at com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown Source)
            at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
            at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
            at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
            at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
            at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
            at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
            at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
            at com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)
            at java.lang.Iterable.forEach(Iterable.java:75)
            at com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)
            at com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown Source)
            at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
            at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
            at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
            at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
            at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
            at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
            at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
            at com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson.processElement(ExtractRequiredJson.java:67)
            at com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson$DoFnInvoker.invokeProcessElement(Unknown Source)
            at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
            at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
            at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
            at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
            at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
            at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
            at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
            at com.intuit.data.platform.process.thrivev2.filter.Filter.filterElement(Filter.java:49)
            at com.intuit.data.platform.process.thrivev2.filter.Filter$DoFnInvoker.invokeProcessElement(Unknown Source)
            at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
            at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
            at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
            at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
            at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
            at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
            at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
            at org.apache.beam.sdk.transforms.Filter$1.processElement(Filter.java:211)
            at org.apache.beam.sdk.transforms.Filter$1$DoFnInvoker.invokeProcessElement(Unknown Source)
            at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
            at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
            at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
            at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
            at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
            at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
            at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
            at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
            at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
            at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
            at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
            at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
            at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
            at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
            at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
            at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
            at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
            at com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn.extract(EbExtractorDoFn.java:85)
            at com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
            at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)






2.            Connection timed out:

ERROR o.a.f.s.c.o.a.c.ConnectionState - Connection timed out for connection string (internal-a49124072c9ca4429b037070c497dc28-234959464.us-west-2.elb.amazonaws.com:12181) and timeout (15000) / elapsed (58732)
org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
            at org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
            at org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
            at org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
            at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835)
            at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
            at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
            at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)













3.            com.amazonaws.AbortedException

            Caused by: java.io.IOException: com.amazonaws.AbortedException:
            at org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.flush(S3WritableByteChannel.java:153)
            at org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.write(S3WritableByteChannel.java:127)
            at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
            at java.nio.channels.Channels.writeFully(Channels.java:101)
            at java.nio.channels.Channels.access$000(Channels.java:61)
            at java.nio.channels.Channels$1.write(Channels.java:174)
            at org.apache.beam.sdk.io.parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:452)
            at org.apache.beam.sdk.io.parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:447)
            at org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
            at org.apache.parquet.hadoop.ParquetFileWriter.writeDataPages(ParquetFileWriter.java:460)
            at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:201)
            at org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:261)
            at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173)
            at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
            at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)
            at org.apache.beam.sdk.io.parquet.ParquetIO$Sink.flush(ParquetIO.java:394)
            at org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.finishWrite(FileIO.java:1400)
            at org.apache.beam.sdk.io.FileBasedSink$Writer.close(FileBasedSink.java:1006)
            at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:780)








4.            Connection unexpectedly closed by remote task manager:
 WARN  o.a.flink.runtime.taskmanager.Task - FileIO.Write/WriteFiles/GatherTempFileResults/Drop key/Values/Map/ParMultiDo(Anonymous) -> FileIO.Write/WriteFiles/GatherTempFileResults/Gather bundles/ParMultiDo(GatherBundlesPerWindow) -> FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Pair with random key/ParMultiDo(AssignShard) (5/5)#0 (b1f59ef12b569d904c28de21a4087655) switched from RUNNING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager '10.35.134.92/10.35.134.92:33413'. This might indicate that the remote task manager was lost.






5.            Checkpoints are failing with IOExceptions: After a few restarts checkpoints start failing with IOExceptions.

Caused by: java.io.IOException: Interrupted while waiting for buffer
            at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341)
            at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
            at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
            at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
            at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
            at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
            at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
            ... 176 common frames omitted





       Just wanted to know if anyone has experienced these kind of issues and how we can solve these.



       Thanks,
        Sandeep





Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

Posted by Etienne Chauchot <ec...@apache.org>.
Hi Sandeep, hi all,

I'm getting back to you on this question after a check on Beam:

ParquetIO has a block group size support (see for ex 
https://www.dremio.com/tuning-parquet/).

Default group size is 1024*1024 bytes. Have you tried playing with 
different values of it using Sink#withRowGroupSize(), in particular 
matching your S3 filesystem block size?

Also you should put your Beam logs in debug level as the IO has 
"allocated memory" and "too much memory used" debug and warning log 
messages.

Best

Etienne Chauchot

On 15/10/2021 13:02, Etienne Chauchot wrote:
> Hi Sandeep and community,
>
> I'm a PMC member of the Beam community and also a contributor on 
> Flink. I also used parquet in conjunction with Beam pipelines running 
> with Flink Beam runner. So I should be able to help:
>
> I don't think it is a Flink issue as you have a Beam pipeline so you 
> use only Beam IOs that are wrapped by Beam runner inside a Flink 
> operator at (Beam) translation time.
>
> I don't think It is related to S3 either but much more to ParquetIO in 
> Beam (read and write connector).
>
> I'll check at the Beam side and get back to you.
>
> Best
>
> Etienne.
>
> On 30/09/2021 14:42, Till Rohrmann wrote:
>> Hi Sandeep,
>>
>> I am not a Beam expert. The problem might be caused by the used S3
>> filesystem implementation. Have you tried whether the same problem 
>> occurs
>> when using vanilla Flink's latest version? Alternatively, you could also
>> reach out to the Beam community or ask on Flink's user ML whether people
>> have experience with such a problem.
>>
>> Some of the exceptions look as if your network is a bit flakey. You 
>> might
>> wanna look into the infrastructure you are running on.
>>
>> Cheers,
>> Till
>>
>> On Tue, Sep 14, 2021 at 5:22 PM Kathula, Sandeep
>> <Sa...@intuit.com.invalid> wrote:
>>
>>> Hi,
>>>     We have a simple Beam application which reads from Kafka, 
>>> converts to
>>> parquet and write to S3 with Flink runner (Beam 2.29 and Flink 
>>> 1.12). We
>>> have a fixed window of 5 minutes after conversion to
>>> PCollection<GenericRecord> and then writing to S3. We have around 320
>>> columns in our data. Our intention is to write large files of size 
>>> 128MB or
>>> more so that it won’t have a small file problem when reading back from
>>> Hive. But from what we observed it is taking too much memory to 
>>> write to S3
>>> (giving memory of 8GB to heap is not enough to write 50 MB files and 
>>> it is
>>> going OOM). When I increase memory for heap to 32GB then it take lot of
>>> time to write records to s3.
>>> For instance it takes:
>>>
>>> 20 MB file - 30 sec
>>> 50 MB file - 1 min 16 sec
>>> 75 MB file - 2 min 15 sec
>>> 83 MB file - 2 min 40 sec
>>>
>>> Code block to write to S3:
>>> PCollection<GenericRecord> parquetRecord = ………………………….
>>>
>>> parquetRecord.apply(FileIO.<GenericRecord>write()
>>>                  .via(ParquetIO.sink(getOutput_schema()))
>>>                  .to(outputPath.isEmpty() ? outputPath() : outputPath)
>>>                  .withNumShards(5)
>>>                  .withNaming(new CustomFileNaming("snappy.parquet")));
>>>
>>>
>>> We are also getting different exceptions like:
>>>
>>>
>>>    1.  UserCodeException:
>>>
>>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
>>>
>>> Could not forward element to next operator
>>>              at
>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36) 
>>>
>>>              at java.lang.Iterable.forEach(Iterable.java:75)
>>>              at
>>> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson.processElement(ExtractRequiredJson.java:67) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.filter.Filter.filterElement(Filter.java:49) 
>>>
>>>              at
>>> com.intuit.data.platform.process.thrivev2.filter.Filter$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 
>>>
>>>              at
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73) 
>>>
>>>              at
>>> org.apache.beam.sdk.transforms.Filter$1.processElement(Filter.java:211)
>>>              at
>>> org.apache.beam.sdk.transforms.Filter$1$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 
>>>
>>>              at
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73) 
>>>
>>>              at
>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139) 
>>>
>>>              at
>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) 
>>>
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) 
>>>
>>>              at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 
>>>
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) 
>>>
>>>              at
>>> com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn.extract(EbExtractorDoFn.java:85) 
>>>
>>>              at
>>> com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn$DoFnInvoker.invokeProcessElement(Unknown 
>>>
>>> Source)
>>>              at
>>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 2.            Connection timed out:
>>>
>>> ERROR o.a.f.s.c.o.a.c.ConnectionState - Connection timed out for
>>> connection string (
>>> internal-a49124072c9ca4429b037070c497dc28-234959464.us-west-2.elb.amazonaws.com:12181) 
>>>
>>> and timeout (15000) / elapsed (58732)
>>> org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException: 
>>>
>>> KeeperErrorCode = ConnectionLoss
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225) 
>>>
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94) 
>>>
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117) 
>>>
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835) 
>>>
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) 
>>>
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) 
>>>
>>>              at
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) 
>>>
>>>              at 
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>              at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
>>>
>>>              at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
>>>
>>>              at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
>>>
>>>              at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
>>>
>>>              at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 3.            com.amazonaws.AbortedException
>>>
>>>              Caused by: java.io.IOException: 
>>> com.amazonaws.AbortedException:
>>>              at
>>> org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.flush(S3WritableByteChannel.java:153) 
>>>
>>>              at
>>> org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.write(S3WritableByteChannel.java:127) 
>>>
>>>              at 
>>> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
>>>              at 
>>> java.nio.channels.Channels.writeFully(Channels.java:101)
>>>              at java.nio.channels.Channels.access$000(Channels.java:61)
>>>              at java.nio.channels.Channels$1.write(Channels.java:174)
>>>              at org.apache.beam.sdk.io
>>> .parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:452)
>>>              at org.apache.beam.sdk.io
>>> .parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:447)
>>>              at
>>> org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46) 
>>>
>>>              at
>>> org.apache.parquet.hadoop.ParquetFileWriter.writeDataPages(ParquetFileWriter.java:460) 
>>>
>>>              at
>>> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:201) 
>>>
>>>              at
>>> org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:261) 
>>>
>>>              at
>>> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173) 
>>>
>>>              at
>>> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114) 
>>>
>>>              at
>>> org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)
>>>              at org.apache.beam.sdk.io
>>> .parquet.ParquetIO$Sink.flush(ParquetIO.java:394)
>>>              at
>>> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.finishWrite(FileIO.java:1400) 
>>>
>>>              at org.apache.beam.sdk.io
>>> .FileBasedSink$Writer.close(FileBasedSink.java:1006)
>>>              at org.apache.beam.sdk.io
>>> .WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:780) 
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 4.            Connection unexpectedly closed by remote task manager:
>>>   WARN  o.a.flink.runtime.taskmanager.Task -
>>> FileIO.Write/WriteFiles/GatherTempFileResults/Drop
>>> key/Values/Map/ParMultiDo(Anonymous) ->
>>> FileIO.Write/WriteFiles/GatherTempFileResults/Gather
>>> bundles/ParMultiDo(GatherBundlesPerWindow) ->
>>> FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Pair 
>>>
>>> with random key/ParMultiDo(AssignShard) (5/5)#0
>>> (b1f59ef12b569d904c28de21a4087655) switched from RUNNING to FAILED.
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
>>>
>>> Connection unexpectedly closed by remote task manager '
>>> 10.35.134.92/10.35.134.92:33413'. This might indicate that the remote
>>> task manager was lost.
>>>
>>>
>>>
>>>
>>>
>>>
>>> 5.            Checkpoints are failing with IOExceptions: After a few
>>> restarts checkpoints start failing with IOExceptions.
>>>
>>> Caused by: java.io.IOException: Interrupted while waiting for buffer
>>>              at org.apache.flink.runtime.io
>>> .network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341) 
>>>
>>>              at org.apache.flink.runtime.io
>>> .network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313) 
>>>
>>>              at org.apache.flink.runtime.io
>>> .network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257) 
>>>
>>>              at org.apache.flink.runtime.io
>>> .network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149) 
>>>
>>>              at org.apache.flink.runtime.io
>>> .network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>>>              at org.apache.flink.runtime.io
>>> .network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) 
>>>
>>>              at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101) 
>>>
>>>              ... 176 common frames omitted
>>>
>>>
>>>
>>>
>>>
>>>         Just wanted to know if anyone has experienced these kind of 
>>> issues
>>> and how we can solve these.
>>>
>>>
>>>
>>>         Thanks,
>>>          Sandeep
>>>
>>>
>>>
>>>
>>>

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

Posted by Etienne Chauchot <ec...@apache.org>.
Hi Sandeep and community,

I'm a PMC member of the Beam community and also a contributor on Flink. 
I also used parquet in conjunction with Beam pipelines running with 
Flink Beam runner. So I should be able to help:

I don't think it is a Flink issue as you have a Beam pipeline so you use 
only Beam IOs that are wrapped by Beam runner inside a Flink operator at 
(Beam) translation time.

I don't think It is related to S3 either but much more to ParquetIO in 
Beam (read and write connector).

I'll check at the Beam side and get back to you.

Best

Etienne.

On 30/09/2021 14:42, Till Rohrmann wrote:
> Hi Sandeep,
>
> I am not a Beam expert. The problem might be caused by the used S3
> filesystem implementation. Have you tried whether the same problem occurs
> when using vanilla Flink's latest version? Alternatively, you could also
> reach out to the Beam community or ask on Flink's user ML whether people
> have experience with such a problem.
>
> Some of the exceptions look as if your network is a bit flakey. You might
> wanna look into the infrastructure you are running on.
>
> Cheers,
> Till
>
> On Tue, Sep 14, 2021 at 5:22 PM Kathula, Sandeep
> <Sa...@intuit.com.invalid> wrote:
>
>> Hi,
>>     We have a simple Beam application which reads from Kafka, converts to
>> parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
>> have a fixed window of 5 minutes after conversion to
>> PCollection<GenericRecord> and then writing to S3. We have around 320
>> columns in our data. Our intention is to write large files of size 128MB or
>> more so that it won’t have a small file problem when reading back from
>> Hive. But from what we observed it is taking too much memory to write to S3
>> (giving memory of 8GB to heap is not enough to write 50 MB files and it is
>> going OOM). When I increase memory for heap to 32GB then it take lot of
>> time to write records to s3.
>> For instance it takes:
>>
>> 20 MB file - 30 sec
>> 50 MB file - 1 min 16 sec
>> 75 MB file - 2 min 15 sec
>> 83 MB file - 2 min 40 sec
>>
>> Code block to write to S3:
>> PCollection<GenericRecord> parquetRecord = ………………………….
>>
>> parquetRecord.apply(FileIO.<GenericRecord>write()
>>                  .via(ParquetIO.sink(getOutput_schema()))
>>                  .to(outputPath.isEmpty() ? outputPath() : outputPath)
>>                  .withNumShards(5)
>>                  .withNaming(new CustomFileNaming("snappy.parquet")));
>>
>>
>> We are also getting different exceptions like:
>>
>>
>>    1.  UserCodeException:
>>
>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>>              at
>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>              at
>> com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>              at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>              at
>> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)
>>              at java.lang.Iterable.forEach(Iterable.java:75)
>>              at
>> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)
>>              at
>> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>              at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>              at
>> com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson.processElement(ExtractRequiredJson.java:67)
>>              at
>> com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>              at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>              at
>> com.intuit.data.platform.process.thrivev2.filter.Filter.filterElement(Filter.java:49)
>>              at
>> com.intuit.data.platform.process.thrivev2.filter.Filter$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>              at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>              at
>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
>>              at
>> org.apache.beam.sdk.transforms.Filter$1.processElement(Filter.java:211)
>>              at
>> org.apache.beam.sdk.transforms.Filter$1$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>              at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>              at
>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
>>              at
>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>>              at
>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>              at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>              at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>              at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>>              at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>              at
>> com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn.extract(EbExtractorDoFn.java:85)
>>              at
>> com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>              at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>
>>
>>
>>
>>
>>
>> 2.            Connection timed out:
>>
>> ERROR o.a.f.s.c.o.a.c.ConnectionState - Connection timed out for
>> connection string (
>> internal-a49124072c9ca4429b037070c497dc28-234959464.us-west-2.elb.amazonaws.com:12181)
>> and timeout (15000) / elapsed (58732)
>> org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException:
>> KeeperErrorCode = ConnectionLoss
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835)
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
>>              at
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
>>              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>              at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>              at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>              at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>              at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>              at java.lang.Thread.run(Thread.java:748)
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 3.            com.amazonaws.AbortedException
>>
>>              Caused by: java.io.IOException: com.amazonaws.AbortedException:
>>              at
>> org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.flush(S3WritableByteChannel.java:153)
>>              at
>> org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.write(S3WritableByteChannel.java:127)
>>              at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
>>              at java.nio.channels.Channels.writeFully(Channels.java:101)
>>              at java.nio.channels.Channels.access$000(Channels.java:61)
>>              at java.nio.channels.Channels$1.write(Channels.java:174)
>>              at org.apache.beam.sdk.io
>> .parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:452)
>>              at org.apache.beam.sdk.io
>> .parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:447)
>>              at
>> org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
>>              at
>> org.apache.parquet.hadoop.ParquetFileWriter.writeDataPages(ParquetFileWriter.java:460)
>>              at
>> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:201)
>>              at
>> org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:261)
>>              at
>> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173)
>>              at
>> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
>>              at
>> org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)
>>              at org.apache.beam.sdk.io
>> .parquet.ParquetIO$Sink.flush(ParquetIO.java:394)
>>              at
>> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.finishWrite(FileIO.java:1400)
>>              at org.apache.beam.sdk.io
>> .FileBasedSink$Writer.close(FileBasedSink.java:1006)
>>              at org.apache.beam.sdk.io
>> .WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:780)
>>
>>
>>
>>
>>
>>
>>
>>
>> 4.            Connection unexpectedly closed by remote task manager:
>>   WARN  o.a.flink.runtime.taskmanager.Task -
>> FileIO.Write/WriteFiles/GatherTempFileResults/Drop
>> key/Values/Map/ParMultiDo(Anonymous) ->
>> FileIO.Write/WriteFiles/GatherTempFileResults/Gather
>> bundles/ParMultiDo(GatherBundlesPerWindow) ->
>> FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Pair
>> with random key/ParMultiDo(AssignShard) (5/5)#0
>> (b1f59ef12b569d904c28de21a4087655) switched from RUNNING to FAILED.
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager '
>> 10.35.134.92/10.35.134.92:33413'. This might indicate that the remote
>> task manager was lost.
>>
>>
>>
>>
>>
>>
>> 5.            Checkpoints are failing with IOExceptions: After a few
>> restarts checkpoints start failing with IOExceptions.
>>
>> Caused by: java.io.IOException: Interrupted while waiting for buffer
>>              at org.apache.flink.runtime.io
>> .network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341)
>>              at org.apache.flink.runtime.io
>> .network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
>>              at org.apache.flink.runtime.io
>> .network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
>>              at org.apache.flink.runtime.io
>> .network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
>>              at org.apache.flink.runtime.io
>> .network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>>              at org.apache.flink.runtime.io
>> .network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
>>              at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
>>              ... 176 common frames omitted
>>
>>
>>
>>
>>
>>         Just wanted to know if anyone has experienced these kind of issues
>> and how we can solve these.
>>
>>
>>
>>         Thanks,
>>          Sandeep
>>
>>
>>
>>
>>

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

Posted by Till Rohrmann <tr...@apache.org>.
Hi Sandeep,

I am not a Beam expert. The problem might be caused by the used S3
filesystem implementation. Have you tried whether the same problem occurs
when using vanilla Flink's latest version? Alternatively, you could also
reach out to the Beam community or ask on Flink's user ML whether people
have experience with such a problem.

Some of the exceptions look as if your network is a bit flakey. You might
wanna look into the infrastructure you are running on.

Cheers,
Till

On Tue, Sep 14, 2021 at 5:22 PM Kathula, Sandeep
<Sa...@intuit.com.invalid> wrote:

> Hi,
>    We have a simple Beam application which reads from Kafka, converts to
> parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
> have a fixed window of 5 minutes after conversion to
> PCollection<GenericRecord> and then writing to S3. We have around 320
> columns in our data. Our intention is to write large files of size 128MB or
> more so that it won’t have a small file problem when reading back from
> Hive. But from what we observed it is taking too much memory to write to S3
> (giving memory of 8GB to heap is not enough to write 50 MB files and it is
> going OOM). When I increase memory for heap to 32GB then it take lot of
> time to write records to s3.
> For instance it takes:
>
> 20 MB file - 30 sec
> 50 MB file - 1 min 16 sec
> 75 MB file - 2 min 15 sec
> 83 MB file - 2 min 40 sec
>
> Code block to write to S3:
> PCollection<GenericRecord> parquetRecord = ………………………….
>
> parquetRecord.apply(FileIO.<GenericRecord>write()
>                 .via(ParquetIO.sink(getOutput_schema()))
>                 .to(outputPath.isEmpty() ? outputPath() : outputPath)
>                 .withNumShards(5)
>                 .withNaming(new CustomFileNaming("snappy.parquet")));
>
>
> We are also getting different exceptions like:
>
>
>   1.  UserCodeException:
>
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>             at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>             at
> com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>             at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>             at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>             at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>             at
> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)
>             at java.lang.Iterable.forEach(Iterable.java:75)
>             at
> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)
>             at
> com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>             at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>             at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>             at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>             at
> com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson.processElement(ExtractRequiredJson.java:67)
>             at
> com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>             at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>             at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>             at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>             at
> com.intuit.data.platform.process.thrivev2.filter.Filter.filterElement(Filter.java:49)
>             at
> com.intuit.data.platform.process.thrivev2.filter.Filter$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>             at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>             at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>             at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>             at
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
>             at
> org.apache.beam.sdk.transforms.Filter$1.processElement(Filter.java:211)
>             at
> org.apache.beam.sdk.transforms.Filter$1$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>             at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>             at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>             at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>             at
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
>             at
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>             at
> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>             at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>             at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>             at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>             at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>             at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>             at
> com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn.extract(EbExtractorDoFn.java:85)
>             at
> com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>             at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>
>
>
>
>
>
> 2.            Connection timed out:
>
> ERROR o.a.f.s.c.o.a.c.ConnectionState - Connection timed out for
> connection string (
> internal-a49124072c9ca4429b037070c497dc28-234959464.us-west-2.elb.amazonaws.com:12181)
> and timeout (15000) / elapsed (58732)
> org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException:
> KeeperErrorCode = ConnectionLoss
>             at
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
>             at
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
>             at
> org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
>             at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835)
>             at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
>             at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
>             at
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
>             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>             at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>             at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>             at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>             at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>             at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
>
>
>
>
>
>
>
>
> 3.            com.amazonaws.AbortedException
>
>             Caused by: java.io.IOException: com.amazonaws.AbortedException:
>             at
> org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.flush(S3WritableByteChannel.java:153)
>             at
> org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.write(S3WritableByteChannel.java:127)
>             at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
>             at java.nio.channels.Channels.writeFully(Channels.java:101)
>             at java.nio.channels.Channels.access$000(Channels.java:61)
>             at java.nio.channels.Channels$1.write(Channels.java:174)
>             at org.apache.beam.sdk.io
> .parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:452)
>             at org.apache.beam.sdk.io
> .parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:447)
>             at
> org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
>             at
> org.apache.parquet.hadoop.ParquetFileWriter.writeDataPages(ParquetFileWriter.java:460)
>             at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:201)
>             at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:261)
>             at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173)
>             at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
>             at
> org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)
>             at org.apache.beam.sdk.io
> .parquet.ParquetIO$Sink.flush(ParquetIO.java:394)
>             at
> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.finishWrite(FileIO.java:1400)
>             at org.apache.beam.sdk.io
> .FileBasedSink$Writer.close(FileBasedSink.java:1006)
>             at org.apache.beam.sdk.io
> .WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:780)
>
>
>
>
>
>
>
>
> 4.            Connection unexpectedly closed by remote task manager:
>  WARN  o.a.flink.runtime.taskmanager.Task -
> FileIO.Write/WriteFiles/GatherTempFileResults/Drop
> key/Values/Map/ParMultiDo(Anonymous) ->
> FileIO.Write/WriteFiles/GatherTempFileResults/Gather
> bundles/ParMultiDo(GatherBundlesPerWindow) ->
> FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Pair
> with random key/ParMultiDo(AssignShard) (5/5)#0
> (b1f59ef12b569d904c28de21a4087655) switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager '
> 10.35.134.92/10.35.134.92:33413'. This might indicate that the remote
> task manager was lost.
>
>
>
>
>
>
> 5.            Checkpoints are failing with IOExceptions: After a few
> restarts checkpoints start failing with IOExceptions.
>
> Caused by: java.io.IOException: Interrupted while waiting for buffer
>             at org.apache.flink.runtime.io
> .network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341)
>             at org.apache.flink.runtime.io
> .network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
>             at org.apache.flink.runtime.io
> .network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
>             at org.apache.flink.runtime.io
> .network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
>             at org.apache.flink.runtime.io
> .network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>             at org.apache.flink.runtime.io
> .network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
>             at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
>             ... 176 common frames omitted
>
>
>
>
>
>        Just wanted to know if anyone has experienced these kind of issues
> and how we can solve these.
>
>
>
>        Thanks,
>         Sandeep
>
>
>
>
>