You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/09/30 09:12:00 UTC

[jira] [Work logged] (BEAM-8303) Filesystems not properly registered using FileIO.write()

     [ https://issues.apache.org/jira/browse/BEAM-8303?focusedWorklogId=320405&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320405 ]

ASF GitHub Bot logged work on BEAM-8303:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Sep/19 09:11
            Start Date: 30/Sep/19 09:11
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #9688: [BEAM-8303] Ensure FileSystems registration code runs in non UDF Flink operators
URL: https://github.com/apache/beam/pull/9688
 
 
   The FileBasedSink$FileResultCoder depends on the FileSystems code to be
   initialized. We had previously assumed that this would only be necessary for
   user-defined code, but as it stands, also coders may access the file system.
   
   Without this, the coder may fail during decoding with the following:
   
   ```
   Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme s3
   	at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
   	at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
   	at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
   	at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
   	at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
   	at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
   	at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
   	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
   	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
   	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
   	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
   	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
   	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
   	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
   	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
   	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
   	at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
   	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
   	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
   	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
   	at java.lang.Thread.run(Thread.java:745)
   
   ```
   
   To mitigate such failures, we should always make sure the FileSystems code is
   initialized in the current Task. The class loaders of each Tasks are isolated
   from each other.
   
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 320405)
    Remaining Estimate: 0h
            Time Spent: 10m

> Filesystems not properly registered using FileIO.write()
> --------------------------------------------------------
>
>                 Key: BEAM-8303
>                 URL: https://issues.apache.org/jira/browse/BEAM-8303
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.15.0
>            Reporter: Preston Koprivica
>            Assignee: Maximilian Michels
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> I’m getting the following error when attempting to use the FileIO apis (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions with all the relevant AWS options, so the filesystem registry **should** be properly seeded by the time the graph is compiled and executed:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>     at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>     at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
>  {code}
> For reference, the write code resembles this:
> {code:java}
>  FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
>                 .via(ParquetIO.sink(schema))
>                 .to(options.getOutputDir()). // will be something like: s3://<bucket>/<path>
>                 .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), write);{code}
> The issue does not appear to be related to ParquetIO.sink().  I am able to reliably reproduce the issue using JSON formatted records and TextIO.sink(), as well.  Moreover, AvroIO is affected if withWindowedWrites() option is added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled up the data set size.  The stack trace, while very similar, reads:
> {code:java}
>  java.lang.IllegalArgumentException: No filesystem found for scheme s3
>     at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>     at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>     at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>     at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>     at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>     at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748){code}
>  
> And lastly, I tried adding the following deprecated option (with and without the withNoSpilling() option):
> {code:java}
>  write = write.withIgnoreWindowing(); {code}
> This seemed to fix the issue altogether but aside from having to rely on a deprecated feature, there is the bigger issue of why?
>  
> In reading through some of the source, it seems a common pattern to have to manually register the pipeline options to seed the filesystem registry during the setup part of the operator lifecycle, e.g.: [https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313|https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C024bc6b438914e7351c008d74037641d%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637048478964357677&sdata=iGNAsktzEA9T2hlKQ4e3oscwL8xLQFuCZ6hsGHQb1So%3D&reserved=0]   
>  
> Is it possible that I have hit upon a couple scenarios where that has not taken place?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)