You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Preston Koprivica (Jira)" <ji...@apache.org> on 2019/09/23 19:14:00 UTC
[jira] [Commented] (BEAM-8303) Filesystems not properly registered
using FileIO.write()
[ https://issues.apache.org/jira/browse/BEAM-8303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16936130#comment-16936130 ]
Preston Koprivica commented on BEAM-8303:
-----------------------------------------
I'll defer to the experts on the priority of this issue. Currently, I am able to workaround it by setting FileIO.write().withIgnoreWindowing(), which is also the default for AvroIO ([https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L516]), and I suspect other FileBasedSink apis as well.
> Filesystems not properly registered using FileIO.write()
> --------------------------------------------------------
>
> Key: BEAM-8303
> URL: https://issues.apache.org/jira/browse/BEAM-8303
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.15.0
> Reporter: Preston Koprivica
> Priority: Major
>
> I’m getting the following error when attempting to use the FileIO apis (beam-2.15.0) and integrating with AWS S3. I have setup the PipelineOptions with all the relevant AWS options, so the filesystem registry **should** be properly seeded by the time the graph is compiled and executed:
> {code:java}
> java.lang.IllegalArgumentException: No filesystem found for scheme s3
> at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
> at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
> at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
> at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
> at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
> at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
> at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
> at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
> at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
> at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> For reference, the write code resembles this:
> {code:java}
> FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
> .via(ParquetIO.sink(schema))
> .to(options.getOutputDir()). // will be something like: s3://<bucket>/<path>
> .withSuffix(".parquet");
> records.apply(String.format("Write(%s)", options.getOutputDir()), write);{code}
> The issue does not appear to be related to ParquetIO.sink(). I am able to reliably reproduce the issue using JSON formatted records and TextIO.sink(), as well. Moreover, AvroIO is affected if withWindowedWrites() option is added.
> Just trying some different knobs, I went ahead and set the following option:
> {code:java}
> write = write.withNoSpilling();{code}
> This actually seemed to fix the issue, only to have it reemerge as I scaled up the data set size. The stack trace, while very similar, reads:
> {code:java}
> java.lang.IllegalArgumentException: No filesystem found for scheme s3
> at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
> at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
> at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
> at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
> at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
> at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
> at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
> at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
> at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
> at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748){code}
>
> And lastly, I tried adding the following deprecated option (with and without the withNoSpilling() option):
> {code:java}
> write = write.withIgnoreWindowing(); {code}
> This seemed to fix the issue altogether but aside from having to rely on a deprecated feature, there is the bigger issue of why?
>
> In reading through some of the source, it seems a common pattern to have to manually register the pipeline options to seed the filesystem registry during the setup part of the operator lifecycle, e.g.: [https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313|https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C024bc6b438914e7351c008d74037641d%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637048478964357677&sdata=iGNAsktzEA9T2hlKQ4e3oscwL8xLQFuCZ6hsGHQb1So%3D&reserved=0]
>
> Is it possible that I have hit upon a couple scenarios where that has not taken place?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)