You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Preston Koprivica (Jira)" <ji...@apache.org> on 2019/09/23 19:11:00 UTC

[jira] [Created] (BEAM-8303) Filesystems not properly registered using FileIO.write()

Preston Koprivica created BEAM-8303:
---------------------------------------

             Summary: Filesystems not properly registered using FileIO.write()
                 Key: BEAM-8303
                 URL: https://issues.apache.org/jira/browse/BEAM-8303
             Project: Beam
          Issue Type: Bug
          Components: sdk-java-core
    Affects Versions: 2.15.0
            Reporter: Preston Koprivica


I’m getting the following error when attempting to use the FileIO apis (beam-2.15.0) and integrating with AWS S3.  I have setup the PipelineOptions with all the relevant AWS options, so the filesystem registry **should** be properly seeded by the time the graph is compiled and executed:
{code:java}
 java.lang.IllegalArgumentException: No filesystem found for scheme s3
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
    at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
    at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
 {code}
For reference, the write code resembles this:
{code:java}
 FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(schema))
                .to(options.getOutputDir()). // will be something like: s3://<bucket>/<path>
                .withSuffix(".parquet");

records.apply(String.format("Write(%s)", options.getOutputDir()), write);{code}
The issue does not appear to be related to ParquetIO.sink().  I am able to reliably reproduce the issue using JSON formatted records and TextIO.sink(), as well.  Moreover, AvroIO is affected if withWindowedWrites() option is added.

Just trying some different knobs, I went ahead and set the following option:
{code:java}
write = write.withNoSpilling();{code}
This actually seemed to fix the issue, only to have it reemerge as I scaled up the data set size.  The stack trace, while very similar, reads:
{code:java}
 java.lang.IllegalArgumentException: No filesystem found for scheme s3
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
    at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748){code}
 

And lastly, I tried adding the following deprecated option (with and without the withNoSpilling() option):
{code:java}
 write = write.withIgnoreWindowing(); {code}
This seemed to fix the issue altogether but aside from having to rely on a deprecated feature, there is the bigger issue of why?

 

In reading through some of the source, it seems a common pattern to have to manually register the pipeline options to seed the filesystem registry during the setup part of the operator lifecycle, e.g.: [https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313|https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C024bc6b438914e7351c008d74037641d%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637048478964357677&sdata=iGNAsktzEA9T2hlKQ4e3oscwL8xLQFuCZ6hsGHQb1So%3D&reserved=0]   

 

Is it possible that I have hit upon a couple scenarios where that has not taken place?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)