You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Koprivica,Preston Blake" <Pr...@cerner.com> on 2019/09/19 21:13:19 UTC

No filesystem found for scheme s3 using FileIO

Hello everyone. I’m getting the following error when attempting to use the FileIO apis (beam-2.15.0) and integrating with a 3rd party filesystem, in this case AWS S3:

java.lang.IllegalArgumentException: No filesystem found for scheme s3
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
    at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
    at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

For reference, the write code resembles this:

FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(schema))
                .to(options.getOutputDir()). // will be something like: s3://<bucket>/<path>
                .withSuffix(".parquet");

records.apply(String.format("Write(%s)", options.getOutputDir()), write);

I have setup the PipelineOptions with all the relevant AWS options and the issue does not appear to be related to ParquetIO.sink() directly.  I am able to reliably reproduce the issue using JSON formatted records and TextIO.sink(), as well.

Just trying some different knobs, I went ahead and set the following option:

        write = write.withNoSpilling();

This actually seemed to fix the issue, only to have it reemerge as I scaled up the data set size.  The stack trace, while very similar, reads:

java.lang.IllegalArgumentException: No filesystem found for scheme s3
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
    at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

I’ll be interested to hear some theories on the differences/similarities in the stacks.  And lastly, I tried adding the following deprecated option (with and without the withNoSpilling() option):

write = write.withIgnoreWindowing();

This seemed to fix the issue altogether but aside from having to rely on a deprecated feature, there is the bigger issue of why?

In reading through some of the source, it seems a common pattern to have to manually register the pipeline options to seed the filesystem registry during the setup part of the operator lifecycle, e.g.: https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313

Is it possible that I have hit upon a couple scenarios where that has not taken place?  Unfortunately, I’m not yet at a position to suggest a fix, but I’m guessing there’s some missing initialization code in one or more of the batch operators.  If this is indeed a legitimate issue, I’ll be happy to log an issue, but I’ll hold off until the community gets a chance to look at it.

Thanks,

  *   Preston



CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.

Re: No filesystem found for scheme s3 using FileIO

Posted by Maximilian Michels <mx...@apache.org>.
Hi Maulik,

Thanks for reporting.

Yes, there is still a chance this bug will occur in batch mode. David 
(CC) has filed an issue for this: 
https://issues.apache.org/jira/browse/BEAM-8577

The current approach to let every operator ensure FileSystems is 
initialized does not always work if we have a shuffle operation. The 
reason is that the receiving layer may attempt to decode data using a 
coder which relies on FileSystems _before_ the following operator has 
been initialized.

I think we have to go with a fix that initializes FileSystems in the 
coder wrapper (CoderTypeInformation). Perhaps we can liven up the 
current PR: https://github.com/apache/beam/pull/10027

Cheers,
Max

On 11.12.19 21:59, Maulik Gandhi wrote:
> Hi Beam Team,
> 
> I have a data pipeline (Beam on Flink running on YARN on AWS EMR), which 
> reads some data and does a simple filtering operation and writes the 
> data to data source S3.
> 
> *Components and Versions:*
> - Beam: 2.16.0 (branch: release-2.16.0)
> - Flink: 1.8
> - YARN on AWS EMR: emr-5.26.0
> 
> Below is a snippet of code
> 
> PCollection<SomeType> someTypes = pipeline.apply(new  ReadLatestSomeType());
> PCollection<SomeTypeValue> someTypesOutput =
>      someTypes.apply(
>          Filter.by(
>              someTypeElement -> {
>                if  (some condition) {
>                  return  false;
>                }
>                return  true;
>              }));
> 
> someTypesOutput
>      .apply(
>          AvroIO.write(SomeType.class).to(options.getDestination().get()).withOutputFilenames())
>      .getPerDestinationOutputFilenames();
> 
> 
> Below is the exception, I see on Flink job
> 
> java.lang.IllegalArgumentException: No filesystem foundfor  scheme s3
> 	at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
> 	at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
> 	at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
> 	at org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
> 	at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
> 	at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
> 	at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> 	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
> 	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
> 	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
> 	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
> 	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
> 	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
> 	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:106)
> 
> 
> My Beam pipeline is failing randomly with the exception I have listed 
> above.  I saw that the fix was made available on 
> the branch: release-2.16.0 (comment on JIRA: 
> https://issues.apache.org/jira/browse/BEAM-8303) which I have checked 
> out locally.  I did a custom build of the Apache/Beam 
> (branch: release-2.16.0) and made sure the artifacts were published to 
> the local maven repository so that I can use the artifacts on my 
> project.  I imported Beam artifacts (2.16.0-MG-SNAPSHOT) in my project 
> and build my project.  Once I had the compiled JAR, I kicked off Flink 
> jobs on an AWS EMR cluster.  The Flink job will give fail/success 
> randomly (non-deterministic), without any change in the way I run the 
> command, or re-building my artifact.
> 
> I have noticed if I run the Flink Job with the parallelism of 1, it will 
> not fail, but if I run the same job with the parallelism of 5 it can 
> fail or succeed.
> 
> If anyone can please help me out or give me directions, I could try out, 
> it will be greatly appreciated.
> 
> Thanks.
> - Maulik
> 
> On Wed, Sep 25, 2019 at 10:53 AM Koprivica,Preston Blake 
> <Preston.B.Koprivica@cerner.com <ma...@cerner.com>> 
> wrote:
> 
>     Not a problem!  Thanks for looking into this.  In reading through
>     the source associated with the stacktrace, I also noticed that
>     there's neither user-code, nor beam-to-flink lifecycle code
>     available for initialization. As far as I could tell, it was pure
>     flink down to the coders.   Nothing new here, but maybe it bolsters
>     confidence in your diagnosis.   I went ahead and logged an issue
>     here: https://issues.apache.org/jira/browse/BEAM-8303.
> 
>     Let me know what I can do to help - I'm happy to test/verify any
>     fixes you want to try and review any code (bearing in mind I'm a
>     total newb in the beam space).
> 
>     Thanks again,
>     Preston
> 
>     On 9/25/19, 10:34 AM, "Maximilian Michels" <mxm@apache.org
>     <ma...@apache.org>> wrote:
> 
>          Hi Preston,
> 
>          Sorry about the name mixup, of course I meant to write Preston not
>          Magnus :) See my reply below.
> 
>          cheers,
>          Max
> 
>          On 25.09.19 08:31, Maximilian Michels wrote:
>          > Hi Magnus,
>          >
>          > Your observation seems to be correct. There is an issue with
>     the file
>          > system registration.
>          >
>          > The two types of errors you are seeing, as well as the
>     successful run,
>          > are just due to the different structure of the generated
>     transforms. The
>          > Flink scheduler will distribute them differently, which
>     results in some
>          > pipelines being placed on task managers which happen to
>     execute the
>          > FileSystems initialization code and others not.
>          >
>          > There is a quick fix to at least initialize the file system
>     in case it
>          > has not been initialized, by adding the loading code here:
>          >
>     https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F948c6fae909685e09d36b23be643182b34c8df25%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L463&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=n9zr0jMao6oGcSK0G3QSPcHbcfdZlUAGwKhWCpdKT6Y%3D&amp;reserved=0
>          >
>          >
>          > However, there we do not have the pipeline options available,
>     which
>          > prevents any configuration. The problem is that the error
>     occurs in the
>          > coder used in a native Flink operation which does not even
>     run user code.
>          >
>          > I believe the only way fix this is to ship the FileSystems
>          > initialization code in CoderTypeSerializer where we are sure
>     to execute
>          > it in time for any coders which depend on it.
>          >
>          > Could you file an issue? I'd be happy to fix this then.
>          >
>          > Thanks,
>          > Max
>          >
>          > On 24.09.19 09:54, Chamikara Jayalath wrote:
>          >> As Magnus mentioned, FileSystems are picked up from the
>     class path and
>          >> registered here.
>          >>
>     https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L480&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=RPVRS548zo%2FWdf672K%2FKbqJykjf%2FuWyS84CirzR4b0E%3D&amp;reserved=0
>          >>
>          >>
>          >> Seems like Flink is invoking this method at following locations.
>          >>
>     https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkPipelineRunner.java%23L142&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=jiMBDlzvSX2rLY9tZOLtt6QKAmCPTiglWezLKymxJAc%3D&amp;reserved=0
>          >>
>          >>
>     https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkJobServerDriver.java%23L63&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=7hBfpu2mrAltIFCiq98hx5kp%2BLN8XYnavbU%2FKzHudnI%3D&amp;reserved=0
>          >>
>          >>
>          >> I'm not too familiar about Flink sure why S3 is not properly
>     being
>          >> registered when running the Flink job. Ccing some folks who
>     are more
>          >> familiar about Flink.
>          >>
>          >> +Ankur Goenka <mailto:goenka@google.com
>     <ma...@google.com>> +Maximilian Michels
>          >> <mailto:mxm@apache.org <ma...@apache.org>>
>          >>
>          >> Thanks,
>          >> Cham
>          >>
>          >>
>          >> On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake
>          >> <Preston.B.Koprivica@cerner.com
>     <ma...@cerner.com>
>          >> <mailto:Preston.B.Koprivica@cerner.com
>     <ma...@cerner.com>>> wrote:
>          >>
>          >>     Thanks for the reply Magnus.
>          >>
>          >>     I'm sorry it wasn't more clear in the original message. 
>     I have
>          >>     added the aws dependencies and set up the pipeline
>     options with the
>          >>     aws options.   For the case where I set the write to ignore
>          >>     windowing, everything works.  But the option is
>     deprecated and the
>          >>     comments warn against its usage.
>          >>
>          >>     I'm wondering if where no options are set and I see the
>     error that
>          >>     that is a case of improperly initialized filesystems in
>     the flink
>          >>     runner.   Or maybe someone has some different ideas for
>     the culprit.
>          >>
>          >>     Get Outlook for Android
>     <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Faka.ms%2Fghei36&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=AGwyaD%2Bn8MQJlHqYjsefmUDcrgMifcFUVx%2BR%2BOty5Xo%3D&amp;reserved=0>
>          >>
>          >>
>          >>
>     ------------------------------------------------------------------------
>          >>     *From:* Magnus Runesson <magru@linuxalert.org
>     <ma...@linuxalert.org>
>          >>     <mailto:magru@linuxalert.org <ma...@linuxalert.org>>>
>          >>     *Sent:* Saturday, September 21, 2019 9:06:03 AM
>          >>     *To:* user@beam.apache.org <ma...@beam.apache.org>
>     <mailto:user@beam.apache.org <ma...@beam.apache.org>>
>          >>     <user@beam.apache.org <ma...@beam.apache.org>
>     <mailto:user@beam.apache.org <ma...@beam.apache.org>>>
>          >>     *Subject:* Re: No filesystem found for scheme s3 using
>     FileIO
>          >>
>          >>     Hi!
>          >>
>          >>
>          >>     You probably miss the S3 filesystem in your classpath.
>          >>
>          >>     If I remember correctly you must include this
>          >>
>          >>
>     https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=K%2FGNviln25aCjv1biaVakcPpvGEgmAzB%2FggezNGiSOo%3D&amp;reserved=0
>          >>
>          >>
>     <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=ZLJ7QmRWC8XkouQ7r47Ze%2Fy2pxN%2Bcttoi70ilOsJNG0%3D&amp;reserved=0>
>          >>
>          >>     package in your classpath/fat-jar.
>          >>
>          >>     /Magnus
>          >>
>          >>     On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
>          >>>
>          >>>     Hello everyone. I’m getting the following error when
>     attempting to
>          >>>     use the FileIO apis (beam-2.15.0) and integrating with
>     a 3rd party
>          >>>     filesystem, in this case AWS S3:____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     java.lang.IllegalArgumentException: No filesystem found
>     for scheme
>          >>>     s3____
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:456)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:526)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
>          >>>
>          >>>
>          >>>         at
>     org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
>          >>>
>          >>>
>          >>>         at
>          >>>   
>       org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>          >>>
>          >>>         at java.lang.Thread.run(Thread.java:748)____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     For reference, the write code resembles this:____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     FileIO.Write<?, GenericRecord> write =
>          >>>     FileIO.<GenericRecord>write()____
>          >>>
>          >>>                     .via(ParquetIO.sink(schema))____
>          >>>
>          >>>                     .to(options.getOutputDir()). // will be
>     something
>          >>>     like: s3://<bucket>/<path>____
>          >>>
>          >>>                     .withSuffix(".parquet");____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     records.apply(String.format("Write(%s)",
>     options.getOutputDir()),
>          >>>     write);____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     I have setup the PipelineOptions with all the relevant
>     AWS options
>          >>>     and the issue does not appear to be related to
>     ParquetIO.sink()
>          >>>     directly.  I am able to reliably reproduce the issue
>     using JSON
>          >>>     formatted records and TextIO.sink(), as well.____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     Just trying some different knobs, I went ahead and set the
>          >>>     following option:____
>          >>>
>          >>>     __ __
>          >>>
>          >>>             write = write.withNoSpilling();____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     This actually seemed to fix the issue, only to have it
>     reemerge as
>          >>>     I scaled up the data set size.  The stack trace, while very
>          >>>     similar, reads:____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     java.lang.IllegalArgumentException: No filesystem found
>     for scheme
>          >>>     s3____
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:456)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:526)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
>          >>>
>          >>>
>          >>>         at
>     org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>          >>>
>          >>>         at
>          >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____
>          >>>
>          >>>         at
>          >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
>          >>>
>          >>>
>          >>>         at
>          >>>   
>       org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>          >>>
>          >>>         at java.lang.Thread.run(Thread.java:748) ____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     I’ll be interested to hear some theories on the
>          >>>     differences/similarities in the stacks.  And lastly, I
>     tried
>          >>>     adding the following deprecated option (with and
>     without the
>          >>>     withNoSpilling() option):____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     write = write.withIgnoreWindowing();____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     This seemed to fix the issue altogether but aside from
>     having to
>          >>>     rely on a deprecated feature, there is the bigger issue
>     of why?____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     In reading through some of the source, it seems a
>     common pattern
>          >>>     to have to manually register the pipeline options to
>     seed the
>          >>>     filesystem registry during the setup part of the operator
>          >>>     lifecycle, e.g.:
>          >>>
>          >>>
>     https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&amp;reserved=0
>          >>>
>          >>>
>          >>>
>     <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&amp;reserved=0>
>          >>>
>          >>>     ____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     Is it possible that I have hit upon a couple scenarios
>     where that
>          >>>     has not taken place?  Unfortunately, I’m not yet at a
>     position to
>          >>>     suggest a fix, but I’m guessing there’s some missing
>          >>>     initialization code in one or more of the batch
>     operators.  If
>          >>>     this is indeed a legitimate issue, I’ll be happy to log
>     an issue,
>          >>>     but I’ll hold off until the community gets a chance to
>     look at
>          >>> it.____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     Thanks,____
>          >>>
>          >>>       * Preston ____
>          >>>
>          >>>     CONFIDENTIALITY NOTICE This message and any included
>     attachments
>          >>>     are from Cerner Corporation and are intended only for the
>          >>>     addressee. The information contained in this message is
>          >>>     confidential and may constitute inside or non-public
>     information
>          >>>     under international, federal, or state securities laws.
>          >>>     Unauthorized forwarding, printing, copying,
>     distribution, or use
>          >>>     of such information is strictly prohibited and may be
>     unlawful. If
>          >>>     you are not the addressee, please promptly delete this
>     message and
>          >>>     notify the sender of the delivery error by e-mail or
>     you may call
>          >>>     Cerner's corporate offices in Kansas City, Missouri,
>     U.S.A at (+1)
>          >>>     (816)221-1024 <tel:(816)%20221-1024>.
>          >>>
> 
> 
> 
> 
>     CONFIDENTIALITY NOTICE This message and any included attachments are
>     from Cerner Corporation and are intended only for the addressee. The
>     information contained in this message is confidential and may
>     constitute inside or non-public information under international,
>     federal, or state securities laws. Unauthorized forwarding,
>     printing, copying, distribution, or use of such information is
>     strictly prohibited and may be unlawful. If you are not the
>     addressee, please promptly delete this message and notify the sender
>     of the delivery error by e-mail or you may call Cerner's corporate
>     offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
> 

Re: No filesystem found for scheme s3 using FileIO

Posted by Maulik Gandhi <mm...@gmail.com>.
Hi Beam Team,

I have a data pipeline (Beam on Flink running on YARN on AWS EMR), which
reads some data and does a simple filtering operation and writes the data
to data source S3.

*Components and Versions:*
- Beam: 2.16.0 (branch: release-2.16.0)
- Flink: 1.8
- YARN on AWS EMR: emr-5.26.0

Below is a snippet of code

PCollection<SomeType> someTypes = pipeline.apply(new ReadLatestSomeType());
PCollection<SomeTypeValue> someTypesOutput =
    someTypes.apply(
        Filter.by(
            someTypeElement -> {
              if (some condition) {
                return false;
              }
              return true;
            }));

someTypesOutput
    .apply(
        AvroIO.write(SomeType.class).to(options.getDestination().get()).withOutputFilenames())
    .getPerDestinationOutputFilenames();


Below is the exception, I see on Flink job

java.lang.IllegalArgumentException: No filesystem found for scheme s3
	at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
	at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
	at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
	at org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
	at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
	at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
	at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:106)


My Beam pipeline is failing randomly with the exception I have listed
above.  I saw that the fix was made available on the branch: release-2.16.0
(comment on JIRA: https://issues.apache.org/jira/browse/BEAM-8303) which I
have checked out locally.  I did a custom build of the Apache/Beam
(branch: release-2.16.0) and made sure the artifacts were published to the
local maven repository so that I can use the artifacts on my project.  I
imported Beam artifacts (2.16.0-MG-SNAPSHOT) in my project and build my
project.  Once I had the compiled JAR, I kicked off Flink jobs on an AWS
EMR cluster.  The Flink job will give fail/success randomly
(non-deterministic), without any change in the way I run the command, or
re-building my artifact.

I have noticed if I run the Flink Job with the parallelism of 1, it will
not fail, but if I run the same job with the parallelism of 5 it can fail
or succeed.

If anyone can please help me out or give me directions, I could try out, it
will be greatly appreciated.

Thanks.
- Maulik

On Wed, Sep 25, 2019 at 10:53 AM Koprivica,Preston Blake <
Preston.B.Koprivica@cerner.com> wrote:

> Not a problem!  Thanks for looking into this.  In reading through the
> source associated with the stacktrace, I also noticed that there's neither
> user-code, nor beam-to-flink lifecycle code available for initialization.
> As far as I could tell, it was pure flink down to the coders.   Nothing new
> here, but maybe it bolsters confidence in your diagnosis.   I went ahead
> and logged an issue here: https://issues.apache.org/jira/browse/BEAM-8303.
>
> Let me know what I can do to help - I'm happy to test/verify any fixes you
> want to try and review any code (bearing in mind I'm a total newb in the
> beam space).
>
> Thanks again,
> Preston
>
> On 9/25/19, 10:34 AM, "Maximilian Michels" <mx...@apache.org> wrote:
>
>     Hi Preston,
>
>     Sorry about the name mixup, of course I meant to write Preston not
>     Magnus :) See my reply below.
>
>     cheers,
>     Max
>
>     On 25.09.19 08:31, Maximilian Michels wrote:
>     > Hi Magnus,
>     >
>     > Your observation seems to be correct. There is an issue with the file
>     > system registration.
>     >
>     > The two types of errors you are seeing, as well as the successful
> run,
>     > are just due to the different structure of the generated transforms.
> The
>     > Flink scheduler will distribute them differently, which results in
> some
>     > pipelines being placed on task managers which happen to execute the
>     > FileSystems initialization code and others not.
>     >
>     > There is a quick fix to at least initialize the file system in case
> it
>     > has not been initialized, by adding the loading code here:
>     >
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F948c6fae909685e09d36b23be643182b34c8df25%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L463&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=n9zr0jMao6oGcSK0G3QSPcHbcfdZlUAGwKhWCpdKT6Y%3D&amp;reserved=0
>     >
>     >
>     > However, there we do not have the pipeline options available, which
>     > prevents any configuration. The problem is that the error occurs in
> the
>     > coder used in a native Flink operation which does not even run user
> code.
>     >
>     > I believe the only way fix this is to ship the FileSystems
>     > initialization code in CoderTypeSerializer where we are sure to
> execute
>     > it in time for any coders which depend on it.
>     >
>     > Could you file an issue? I'd be happy to fix this then.
>     >
>     > Thanks,
>     > Max
>     >
>     > On 24.09.19 09:54, Chamikara Jayalath wrote:
>     >> As Magnus mentioned, FileSystems are picked up from the class path
> and
>     >> registered here.
>     >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L480&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=RPVRS548zo%2FWdf672K%2FKbqJykjf%2FuWyS84CirzR4b0E%3D&amp;reserved=0
>     >>
>     >>
>     >> Seems like Flink is invoking this method at following locations.
>     >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkPipelineRunner.java%23L142&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=jiMBDlzvSX2rLY9tZOLtt6QKAmCPTiglWezLKymxJAc%3D&amp;reserved=0
>     >>
>     >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkJobServerDriver.java%23L63&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=7hBfpu2mrAltIFCiq98hx5kp%2BLN8XYnavbU%2FKzHudnI%3D&amp;reserved=0
>     >>
>     >>
>     >> I'm not too familiar about Flink sure why S3 is not properly being
>     >> registered when running the Flink job. Ccing some folks who are more
>     >> familiar about Flink.
>     >>
>     >> +Ankur Goenka <ma...@google.com> +Maximilian Michels
>     >> <ma...@apache.org>
>     >>
>     >> Thanks,
>     >> Cham
>     >>
>     >>
>     >> On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake
>     >> <Preston.B.Koprivica@cerner.com
>     >> <ma...@cerner.com>> wrote:
>     >>
>     >>     Thanks for the reply Magnus.
>     >>
>     >>     I'm sorry it wasn't more clear in the original message.  I have
>     >>     added the aws dependencies and set up the pipeline options with
> the
>     >>     aws options.   For the case where I set the write to ignore
>     >>     windowing, everything works.  But the option is deprecated and
> the
>     >>     comments warn against its usage.
>     >>
>     >>     I'm wondering if where no options are set and I see the error
> that
>     >>     that is a case of improperly initialized filesystems in the
> flink
>     >>     runner.   Or maybe someone has some different ideas for the
> culprit.
>     >>
>     >>     Get Outlook for Android <
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Faka.ms%2Fghei36&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=AGwyaD%2Bn8MQJlHqYjsefmUDcrgMifcFUVx%2BR%2BOty5Xo%3D&amp;reserved=0
> >
>     >>
>     >>
>     >>
> ------------------------------------------------------------------------
>     >>     *From:* Magnus Runesson <magru@linuxalert.org
>     >>     <ma...@linuxalert.org>>
>     >>     *Sent:* Saturday, September 21, 2019 9:06:03 AM
>     >>     *To:* user@beam.apache.org <ma...@beam.apache.org>
>     >>     <user@beam.apache.org <ma...@beam.apache.org>>
>     >>     *Subject:* Re: No filesystem found for scheme s3 using FileIO
>     >>
>     >>     Hi!
>     >>
>     >>
>     >>     You probably miss the S3 filesystem in your classpath.
>     >>
>     >>     If I remember correctly you must include this
>     >>
>     >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=K%2FGNviln25aCjv1biaVakcPpvGEgmAzB%2FggezNGiSOo%3D&amp;reserved=0
>     >>
>     >> <
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=ZLJ7QmRWC8XkouQ7r47Ze%2Fy2pxN%2Bcttoi70ilOsJNG0%3D&amp;reserved=0
> >
>     >>
>     >>     package in your classpath/fat-jar.
>     >>
>     >>     /Magnus
>     >>
>     >>     On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
>     >>>
>     >>>     Hello everyone. I’m getting the following error when
> attempting to
>     >>>     use the FileIO apis (beam-2.15.0) and integrating with a 3rd
> party
>     >>>     filesystem, in this case AWS S3:____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     java.lang.IllegalArgumentException: No filesystem found for
> scheme
>     >>>     s3____
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:456)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:526)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
>     >>>
>     >>>
>     >>>         at
> org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.flink.runtime.io
> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.flink.runtime.io
> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>  org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>     >>>
>     >>>         at java.lang.Thread.run(Thread.java:748)____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     For reference, the write code resembles this:____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     FileIO.Write<?, GenericRecord> write =
>     >>>     FileIO.<GenericRecord>write()____
>     >>>
>     >>>                     .via(ParquetIO.sink(schema))____
>     >>>
>     >>>                     .to(options.getOutputDir()). // will be
> something
>     >>>     like: s3://<bucket>/<path>____
>     >>>
>     >>>                     .withSuffix(".parquet");____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     records.apply(String.format("Write(%s)",
> options.getOutputDir()),
>     >>>     write);____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     I have setup the PipelineOptions with all the relevant AWS
> options
>     >>>     and the issue does not appear to be related to ParquetIO.sink()
>     >>>     directly.  I am able to reliably reproduce the issue using JSON
>     >>>     formatted records and TextIO.sink(), as well.____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     Just trying some different knobs, I went ahead and set the
>     >>>     following option:____
>     >>>
>     >>>     __ __
>     >>>
>     >>>             write = write.withNoSpilling();____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     This actually seemed to fix the issue, only to have it
> reemerge as
>     >>>     I scaled up the data set size.  The stack trace, while very
>     >>>     similar, reads:____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     java.lang.IllegalArgumentException: No filesystem found for
> scheme
>     >>>     s3____
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:456)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:526)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
>     >>>
>     >>>
>     >>>         at
> org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>     >>>
>     >>>         at
>     >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____
>     >>>
>     >>>         at
>     >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.flink.runtime.io
> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.flink.runtime.io
> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>  org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>     >>>
>     >>>         at java.lang.Thread.run(Thread.java:748) ____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     I’ll be interested to hear some theories on the
>     >>>     differences/similarities in the stacks.  And lastly, I tried
>     >>>     adding the following deprecated option (with and without the
>     >>>     withNoSpilling() option):____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     write = write.withIgnoreWindowing();____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     This seemed to fix the issue altogether but aside from having
> to
>     >>>     rely on a deprecated feature, there is the bigger issue of
> why?____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     In reading through some of the source, it seems a common
> pattern
>     >>>     to have to manually register the pipeline options to seed the
>     >>>     filesystem registry during the setup part of the operator
>     >>>     lifecycle, e.g.:
>     >>>
>     >>>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&amp;reserved=0
>     >>>
>     >>>
>     >>> <
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&amp;reserved=0
> >
>     >>>
>     >>>     ____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     Is it possible that I have hit upon a couple scenarios where
> that
>     >>>     has not taken place?  Unfortunately, I’m not yet at a position
> to
>     >>>     suggest a fix, but I’m guessing there’s some missing
>     >>>     initialization code in one or more of the batch operators.  If
>     >>>     this is indeed a legitimate issue, I’ll be happy to log an
> issue,
>     >>>     but I’ll hold off until the community gets a chance to look at
>     >>> it.____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     Thanks,____
>     >>>
>     >>>       * Preston ____
>     >>>
>     >>>     CONFIDENTIALITY NOTICE This message and any included
> attachments
>     >>>     are from Cerner Corporation and are intended only for the
>     >>>     addressee. The information contained in this message is
>     >>>     confidential and may constitute inside or non-public
> information
>     >>>     under international, federal, or state securities laws.
>     >>>     Unauthorized forwarding, printing, copying, distribution, or
> use
>     >>>     of such information is strictly prohibited and may be
> unlawful. If
>     >>>     you are not the addressee, please promptly delete this message
> and
>     >>>     notify the sender of the delivery error by e-mail or you may
> call
>     >>>     Cerner's corporate offices in Kansas City, Missouri, U.S.A at
> (+1)
>     >>>     (816)221-1024 <tel:(816)%20221-1024>.
>     >>>
>
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>

Re: No filesystem found for scheme s3 using FileIO

Posted by "Koprivica,Preston Blake" <Pr...@cerner.com>.
Not a problem!  Thanks for looking into this.  In reading through the source associated with the stacktrace, I also noticed that there's neither user-code, nor beam-to-flink lifecycle code available for initialization. As far as I could tell, it was pure flink down to the coders.   Nothing new here, but maybe it bolsters confidence in your diagnosis.   I went ahead and logged an issue here: https://issues.apache.org/jira/browse/BEAM-8303.

Let me know what I can do to help - I'm happy to test/verify any fixes you want to try and review any code (bearing in mind I'm a total newb in the beam space).

Thanks again,
Preston

On 9/25/19, 10:34 AM, "Maximilian Michels" <mx...@apache.org> wrote:

    Hi Preston,

    Sorry about the name mixup, of course I meant to write Preston not
    Magnus :) See my reply below.

    cheers,
    Max

    On 25.09.19 08:31, Maximilian Michels wrote:
    > Hi Magnus,
    >
    > Your observation seems to be correct. There is an issue with the file
    > system registration.
    >
    > The two types of errors you are seeing, as well as the successful run,
    > are just due to the different structure of the generated transforms. The
    > Flink scheduler will distribute them differently, which results in some
    > pipelines being placed on task managers which happen to execute the
    > FileSystems initialization code and others not.
    >
    > There is a quick fix to at least initialize the file system in case it
    > has not been initialized, by adding the loading code here:
    > https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F948c6fae909685e09d36b23be643182b34c8df25%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L463&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=n9zr0jMao6oGcSK0G3QSPcHbcfdZlUAGwKhWCpdKT6Y%3D&amp;reserved=0
    >
    >
    > However, there we do not have the pipeline options available, which
    > prevents any configuration. The problem is that the error occurs in the
    > coder used in a native Flink operation which does not even run user code.
    >
    > I believe the only way fix this is to ship the FileSystems
    > initialization code in CoderTypeSerializer where we are sure to execute
    > it in time for any coders which depend on it.
    >
    > Could you file an issue? I'd be happy to fix this then.
    >
    > Thanks,
    > Max
    >
    > On 24.09.19 09:54, Chamikara Jayalath wrote:
    >> As Magnus mentioned, FileSystems are picked up from the class path and
    >> registered here.
    >> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L480&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=RPVRS548zo%2FWdf672K%2FKbqJykjf%2FuWyS84CirzR4b0E%3D&amp;reserved=0
    >>
    >>
    >> Seems like Flink is invoking this method at following locations.
    >> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkPipelineRunner.java%23L142&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=jiMBDlzvSX2rLY9tZOLtt6QKAmCPTiglWezLKymxJAc%3D&amp;reserved=0
    >>
    >> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkJobServerDriver.java%23L63&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=7hBfpu2mrAltIFCiq98hx5kp%2BLN8XYnavbU%2FKzHudnI%3D&amp;reserved=0
    >>
    >>
    >> I'm not too familiar about Flink sure why S3 is not properly being
    >> registered when running the Flink job. Ccing some folks who are more
    >> familiar about Flink.
    >>
    >> +Ankur Goenka <ma...@google.com> +Maximilian Michels
    >> <ma...@apache.org>
    >>
    >> Thanks,
    >> Cham
    >>
    >>
    >> On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake
    >> <Preston.B.Koprivica@cerner.com
    >> <ma...@cerner.com>> wrote:
    >>
    >>     Thanks for the reply Magnus.
    >>
    >>     I'm sorry it wasn't more clear in the original message.  I have
    >>     added the aws dependencies and set up the pipeline options with the
    >>     aws options.   For the case where I set the write to ignore
    >>     windowing, everything works.  But the option is deprecated and the
    >>     comments warn against its usage.
    >>
    >>     I'm wondering if where no options are set and I see the error that
    >>     that is a case of improperly initialized filesystems in the flink
    >>     runner.   Or maybe someone has some different ideas for the culprit.
    >>
    >>     Get Outlook for Android <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Faka.ms%2Fghei36&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=AGwyaD%2Bn8MQJlHqYjsefmUDcrgMifcFUVx%2BR%2BOty5Xo%3D&amp;reserved=0>
    >>
    >>
    >> ------------------------------------------------------------------------
    >>     *From:* Magnus Runesson <magru@linuxalert.org
    >>     <ma...@linuxalert.org>>
    >>     *Sent:* Saturday, September 21, 2019 9:06:03 AM
    >>     *To:* user@beam.apache.org <ma...@beam.apache.org>
    >>     <user@beam.apache.org <ma...@beam.apache.org>>
    >>     *Subject:* Re: No filesystem found for scheme s3 using FileIO
    >>
    >>     Hi!
    >>
    >>
    >>     You probably miss the S3 filesystem in your classpath.
    >>
    >>     If I remember correctly you must include this
    >>
    >> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=K%2FGNviln25aCjv1biaVakcPpvGEgmAzB%2FggezNGiSOo%3D&amp;reserved=0
    >>
    >> <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=ZLJ7QmRWC8XkouQ7r47Ze%2Fy2pxN%2Bcttoi70ilOsJNG0%3D&amp;reserved=0>
    >>
    >>     package in your classpath/fat-jar.
    >>
    >>     /Magnus
    >>
    >>     On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
    >>>
    >>>     Hello everyone. I’m getting the following error when attempting to
    >>>     use the FileIO apis (beam-2.15.0) and integrating with a 3rd party
    >>>     filesystem, in this case AWS S3:____
    >>>
    >>>     __ __
    >>>
    >>>     java.lang.IllegalArgumentException: No filesystem found for scheme
    >>>     s3____
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
    >>>
    >>>
    >>>         at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
    >>>
    >>>
    >>>         at
    >>>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
    >>>
    >>>         at java.lang.Thread.run(Thread.java:748)____
    >>>
    >>>     __ __
    >>>
    >>>     For reference, the write code resembles this:____
    >>>
    >>>     __ __
    >>>
    >>>     FileIO.Write<?, GenericRecord> write =
    >>>     FileIO.<GenericRecord>write()____
    >>>
    >>>                     .via(ParquetIO.sink(schema))____
    >>>
    >>>                     .to(options.getOutputDir()). // will be something
    >>>     like: s3://<bucket>/<path>____
    >>>
    >>>                     .withSuffix(".parquet");____
    >>>
    >>>     __ __
    >>>
    >>>     records.apply(String.format("Write(%s)", options.getOutputDir()),
    >>>     write);____
    >>>
    >>>     __ __
    >>>
    >>>     I have setup the PipelineOptions with all the relevant AWS options
    >>>     and the issue does not appear to be related to ParquetIO.sink()
    >>>     directly.  I am able to reliably reproduce the issue using JSON
    >>>     formatted records and TextIO.sink(), as well.____
    >>>
    >>>     __ __
    >>>
    >>>     Just trying some different knobs, I went ahead and set the
    >>>     following option:____
    >>>
    >>>     __ __
    >>>
    >>>             write = write.withNoSpilling();____
    >>>
    >>>     __ __
    >>>
    >>>     This actually seemed to fix the issue, only to have it reemerge as
    >>>     I scaled up the data set size.  The stack trace, while very
    >>>     similar, reads:____
    >>>
    >>>     __ __
    >>>
    >>>     java.lang.IllegalArgumentException: No filesystem found for scheme
    >>>     s3____
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
    >>>
    >>>
    >>>         at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
    >>>
    >>>         at
    >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____
    >>>
    >>>         at
    >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____
    >>>
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
    >>>
    >>>         at
    >>>
    >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
    >>>
    >>>
    >>>         at
    >>>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
    >>>
    >>>         at java.lang.Thread.run(Thread.java:748) ____
    >>>
    >>>     __ __
    >>>
    >>>     I’ll be interested to hear some theories on the
    >>>     differences/similarities in the stacks.  And lastly, I tried
    >>>     adding the following deprecated option (with and without the
    >>>     withNoSpilling() option):____
    >>>
    >>>     __ __
    >>>
    >>>     write = write.withIgnoreWindowing();____
    >>>
    >>>     __ __
    >>>
    >>>     This seemed to fix the issue altogether but aside from having to
    >>>     rely on a deprecated feature, there is the bigger issue of why?____
    >>>
    >>>     __ __
    >>>
    >>>     In reading through some of the source, it seems a common pattern
    >>>     to have to manually register the pipeline options to seed the
    >>>     filesystem registry during the setup part of the operator
    >>>     lifecycle, e.g.:
    >>>
    >>> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&amp;reserved=0
    >>>
    >>>
    >>> <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&amp;reserved=0>
    >>>
    >>>     ____
    >>>
    >>>     __ __
    >>>
    >>>     Is it possible that I have hit upon a couple scenarios where that
    >>>     has not taken place?  Unfortunately, I’m not yet at a position to
    >>>     suggest a fix, but I’m guessing there’s some missing
    >>>     initialization code in one or more of the batch operators.  If
    >>>     this is indeed a legitimate issue, I’ll be happy to log an issue,
    >>>     but I’ll hold off until the community gets a chance to look at
    >>> it.____
    >>>
    >>>     __ __
    >>>
    >>>     Thanks,____
    >>>
    >>>       * Preston ____
    >>>
    >>>     CONFIDENTIALITY NOTICE This message and any included attachments
    >>>     are from Cerner Corporation and are intended only for the
    >>>     addressee. The information contained in this message is
    >>>     confidential and may constitute inside or non-public information
    >>>     under international, federal, or state securities laws.
    >>>     Unauthorized forwarding, printing, copying, distribution, or use
    >>>     of such information is strictly prohibited and may be unlawful. If
    >>>     you are not the addressee, please promptly delete this message and
    >>>     notify the sender of the delivery error by e-mail or you may call
    >>>     Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
    >>>     (816)221-1024 <tel:(816)%20221-1024>.
    >>>




CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.

Re: No filesystem found for scheme s3 using FileIO

Posted by Maximilian Michels <mx...@apache.org>.
Hi Preston,

Sorry about the name mixup, of course I meant to write Preston not 
Magnus :) See my reply below.

cheers,
Max

On 25.09.19 08:31, Maximilian Michels wrote:
> Hi Magnus,
> 
> Your observation seems to be correct. There is an issue with the file 
> system registration.
> 
> The two types of errors you are seeing, as well as the successful run, 
> are just due to the different structure of the generated transforms. The 
> Flink scheduler will distribute them differently, which results in some 
> pipelines being placed on task managers which happen to execute the 
> FileSystems initialization code and others not.
> 
> There is a quick fix to at least initialize the file system in case it 
> has not been initialized, by adding the loading code here: 
> https://github.com/apache/beam/blob/948c6fae909685e09d36b23be643182b34c8df25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L463 
> 
> 
> However, there we do not have the pipeline options available, which 
> prevents any configuration. The problem is that the error occurs in the 
> coder used in a native Flink operation which does not even run user code.
> 
> I believe the only way fix this is to ship the FileSystems 
> initialization code in CoderTypeSerializer where we are sure to execute 
> it in time for any coders which depend on it.
> 
> Could you file an issue? I'd be happy to fix this then.
> 
> Thanks,
> Max
> 
> On 24.09.19 09:54, Chamikara Jayalath wrote:
>> As Magnus mentioned, FileSystems are picked up from the class path and 
>> registered here.
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L480 
>>
>>
>> Seems like Flink is invoking this method at following locations.
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java#L142 
>>
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L63 
>>
>>
>> I'm not too familiar about Flink sure why S3 is not properly being 
>> registered when running the Flink job. Ccing some folks who are more 
>> familiar about Flink.
>>
>> +Ankur Goenka <ma...@google.com> +Maximilian Michels 
>> <ma...@apache.org>
>>
>> Thanks,
>> Cham
>>
>>
>> On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake 
>> <Preston.B.Koprivica@cerner.com 
>> <ma...@cerner.com>> wrote:
>>
>>     Thanks for the reply Magnus.
>>
>>     I'm sorry it wasn't more clear in the original message.  I have
>>     added the aws dependencies and set up the pipeline options with the
>>     aws options.   For the case where I set the write to ignore
>>     windowing, everything works.  But the option is deprecated and the
>>     comments warn against its usage.
>>
>>     I'm wondering if where no options are set and I see the error that
>>     that is a case of improperly initialized filesystems in the flink
>>     runner.   Or maybe someone has some different ideas for the culprit.
>>
>>     Get Outlook for Android <https://aka.ms/ghei36>
>>
>>     
>> ------------------------------------------------------------------------
>>     *From:* Magnus Runesson <magru@linuxalert.org
>>     <ma...@linuxalert.org>>
>>     *Sent:* Saturday, September 21, 2019 9:06:03 AM
>>     *To:* user@beam.apache.org <ma...@beam.apache.org>
>>     <user@beam.apache.org <ma...@beam.apache.org>>
>>     *Subject:* Re: No filesystem found for scheme s3 using FileIO
>>
>>     Hi!
>>
>>
>>     You probably miss the S3 filesystem in your classpath.
>>
>>     If I remember correctly you must include this
>>     
>> https://search.maven.org/search?q=a:beam-sdks-java-io-amazon-web-services
>>     
>> <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=ahe8GyTjSswDDCXoOimmVx5u%2FckkbGY4gTS4ZVSRv8Q%3D&reserved=0> 
>>
>>     package in your classpath/fat-jar.
>>
>>     /Magnus
>>
>>     On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
>>>
>>>     Hello everyone. I’m getting the following error when attempting to
>>>     use the FileIO apis (beam-2.15.0) and integrating with a 3rd party
>>>     filesystem, in this case AWS S3:____
>>>
>>>     __ __
>>>
>>>     java.lang.IllegalArgumentException: No filesystem found for scheme
>>>     s3____
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____ 
>>>
>>>
>>>         at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____ 
>>>
>>>
>>>         at
>>>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>>>
>>>         at java.lang.Thread.run(Thread.java:748)____
>>>
>>>     __ __
>>>
>>>     For reference, the write code resembles this:____
>>>
>>>     __ __
>>>
>>>     FileIO.Write<?, GenericRecord> write =
>>>     FileIO.<GenericRecord>write()____
>>>
>>>                     .via(ParquetIO.sink(schema))____
>>>
>>>                     .to(options.getOutputDir()). // will be something
>>>     like: s3://<bucket>/<path>____
>>>
>>>                     .withSuffix(".parquet");____
>>>
>>>     __ __
>>>
>>>     records.apply(String.format("Write(%s)", options.getOutputDir()),
>>>     write);____
>>>
>>>     __ __
>>>
>>>     I have setup the PipelineOptions with all the relevant AWS options
>>>     and the issue does not appear to be related to ParquetIO.sink()
>>>     directly.  I am able to reliably reproduce the issue using JSON
>>>     formatted records and TextIO.sink(), as well.____
>>>
>>>     __ __
>>>
>>>     Just trying some different knobs, I went ahead and set the
>>>     following option:____
>>>
>>>     __ __
>>>
>>>             write = write.withNoSpilling();____
>>>
>>>     __ __
>>>
>>>     This actually seemed to fix the issue, only to have it reemerge as
>>>     I scaled up the data set size.  The stack trace, while very
>>>     similar, reads:____
>>>
>>>     __ __
>>>
>>>     java.lang.IllegalArgumentException: No filesystem found for scheme
>>>     s3____
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____ 
>>>
>>>
>>>         at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>>>
>>>         at 
>>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____
>>>
>>>         at 
>>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____ 
>>>
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>>>
>>>         at
>>>     
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____ 
>>>
>>>
>>>         at
>>>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>>>
>>>         at java.lang.Thread.run(Thread.java:748) ____
>>>
>>>     __ __
>>>
>>>     I’ll be interested to hear some theories on the
>>>     differences/similarities in the stacks.  And lastly, I tried
>>>     adding the following deprecated option (with and without the
>>>     withNoSpilling() option):____
>>>
>>>     __ __
>>>
>>>     write = write.withIgnoreWindowing();____
>>>
>>>     __ __
>>>
>>>     This seemed to fix the issue altogether but aside from having to
>>>     rely on a deprecated feature, there is the bigger issue of why?____
>>>
>>>     __ __
>>>
>>>     In reading through some of the source, it seems a common pattern
>>>     to have to manually register the pipeline options to seed the
>>>     filesystem registry during the setup part of the operator
>>>     lifecycle, e.g.:
>>>     
>>> https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313 
>>>
>>>     
>>> <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=TWbNuwGSmMMscZIW0Iwo8MTZwxipBZMF4Zb0oyHd0do%3D&reserved=0> 
>>>
>>>     ____
>>>
>>>     __ __
>>>
>>>     Is it possible that I have hit upon a couple scenarios where that
>>>     has not taken place?  Unfortunately, I’m not yet at a position to
>>>     suggest a fix, but I’m guessing there’s some missing
>>>     initialization code in one or more of the batch operators.  If
>>>     this is indeed a legitimate issue, I’ll be happy to log an issue,
>>>     but I’ll hold off until the community gets a chance to look at 
>>> it.____
>>>
>>>     __ __
>>>
>>>     Thanks,____
>>>
>>>       * Preston ____
>>>
>>>     CONFIDENTIALITY NOTICE This message and any included attachments
>>>     are from Cerner Corporation and are intended only for the
>>>     addressee. The information contained in this message is
>>>     confidential and may constitute inside or non-public information
>>>     under international, federal, or state securities laws.
>>>     Unauthorized forwarding, printing, copying, distribution, or use
>>>     of such information is strictly prohibited and may be unlawful. If
>>>     you are not the addressee, please promptly delete this message and
>>>     notify the sender of the delivery error by e-mail or you may call
>>>     Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
>>>     (816)221-1024 <tel:(816)%20221-1024>.
>>>

Re: No filesystem found for scheme s3 using FileIO

Posted by Maximilian Michels <mx...@apache.org>.
Hi Magnus,

Your observation seems to be correct. There is an issue with the file 
system registration.

The two types of errors you are seeing, as well as the successful run, 
are just due to the different structure of the generated transforms. The 
Flink scheduler will distribute them differently, which results in some 
pipelines being placed on task managers which happen to execute the 
FileSystems initialization code and others not.

There is a quick fix to at least initialize the file system in case it 
has not been initialized, by adding the loading code here: 
https://github.com/apache/beam/blob/948c6fae909685e09d36b23be643182b34c8df25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L463

However, there we do not have the pipeline options available, which 
prevents any configuration. The problem is that the error occurs in the 
coder used in a native Flink operation which does not even run user code.

I believe the only way fix this is to ship the FileSystems 
initialization code in CoderTypeSerializer where we are sure to execute 
it in time for any coders which depend on it.

Could you file an issue? I'd be happy to fix this then.

Thanks,
Max

On 24.09.19 09:54, Chamikara Jayalath wrote:
> As Magnus mentioned, FileSystems are picked up from the class path and 
> registered here.
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L480
> 
> Seems like Flink is invoking this method at following locations.
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java#L142
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L63
> 
> I'm not too familiar about Flink sure why S3 is not properly being 
> registered when running the Flink job. Ccing some folks who are more 
> familiar about Flink.
> 
> +Ankur Goenka <ma...@google.com> +Maximilian Michels 
> <ma...@apache.org>
> 
> Thanks,
> Cham
> 
> 
> On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake 
> <Preston.B.Koprivica@cerner.com <ma...@cerner.com>> 
> wrote:
> 
>     Thanks for the reply Magnus.
> 
>     I'm sorry it wasn't more clear in the original message.  I have
>     added the aws dependencies and set up the pipeline options with the
>     aws options.   For the case where I set the write to ignore
>     windowing, everything works.  But the option is deprecated and the
>     comments warn against its usage.
> 
>     I'm wondering if where no options are set and I see the error that
>     that is a case of improperly initialized filesystems in the flink
>     runner.   Or maybe someone has some different ideas for the culprit.
> 
>     Get Outlook for Android <https://aka.ms/ghei36>
> 
>     ------------------------------------------------------------------------
>     *From:* Magnus Runesson <magru@linuxalert.org
>     <ma...@linuxalert.org>>
>     *Sent:* Saturday, September 21, 2019 9:06:03 AM
>     *To:* user@beam.apache.org <ma...@beam.apache.org>
>     <user@beam.apache.org <ma...@beam.apache.org>>
>     *Subject:* Re: No filesystem found for scheme s3 using FileIO
> 
>     Hi!
> 
> 
>     You probably miss the S3 filesystem in your classpath.
> 
>     If I remember correctly you must include this
>     https://search.maven.org/search?q=a:beam-sdks-java-io-amazon-web-services
>     <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=ahe8GyTjSswDDCXoOimmVx5u%2FckkbGY4gTS4ZVSRv8Q%3D&reserved=0>
>     package in your classpath/fat-jar.
> 
>     /Magnus
> 
>     On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
>>
>>     Hello everyone. I’m getting the following error when attempting to
>>     use the FileIO apis (beam-2.15.0) and integrating with a 3rd party
>>     filesystem, in this case AWS S3:____
>>
>>     __ __
>>
>>     java.lang.IllegalArgumentException: No filesystem found for scheme
>>     s3____
>>
>>         at
>>     org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)____
>>
>>         at
>>     org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)____
>>
>>         at
>>     org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
>>
>>         at
>>     org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
>>
>>         at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>>
>>         at
>>     org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____
>>
>>         at
>>     org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____
>>
>>         at
>>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
>>
>>         at
>>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
>>
>>         at
>>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
>>
>>         at
>>     org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
>>
>>         at
>>     org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
>>
>>         at
>>     org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
>>
>>         at
>>     org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
>>
>>         at
>>     org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
>>
>>         at
>>     org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
>>
>>         at
>>     org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____
>>
>>         at
>>     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>>
>>         at
>>     org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
>>
>>         at
>>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>>
>>         at java.lang.Thread.run(Thread.java:748)____
>>
>>     __ __
>>
>>     For reference, the write code resembles this:____
>>
>>     __ __
>>
>>     FileIO.Write<?, GenericRecord> write =
>>     FileIO.<GenericRecord>write()____
>>
>>                     .via(ParquetIO.sink(schema))____
>>
>>                     .to(options.getOutputDir()). // will be something
>>     like: s3://<bucket>/<path>____
>>
>>                     .withSuffix(".parquet");____
>>
>>     __ __
>>
>>     records.apply(String.format("Write(%s)", options.getOutputDir()),
>>     write);____
>>
>>     __ __
>>
>>     I have setup the PipelineOptions with all the relevant AWS options
>>     and the issue does not appear to be related to ParquetIO.sink()
>>     directly.  I am able to reliably reproduce the issue using JSON
>>     formatted records and TextIO.sink(), as well.____
>>
>>     __ __
>>
>>     Just trying some different knobs, I went ahead and set the
>>     following option:____
>>
>>     __ __
>>
>>             write = write.withNoSpilling();____
>>
>>     __ __
>>
>>     This actually seemed to fix the issue, only to have it reemerge as
>>     I scaled up the data set size.  The stack trace, while very
>>     similar, reads:____
>>
>>     __ __
>>
>>     java.lang.IllegalArgumentException: No filesystem found for scheme
>>     s3____
>>
>>         at
>>     org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)____
>>
>>         at
>>     org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)____
>>
>>         at
>>     org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
>>
>>         at
>>     org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
>>
>>         at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>>
>>         at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____
>>
>>         at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____
>>
>>         at
>>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
>>
>>         at
>>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
>>
>>         at
>>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
>>
>>         at
>>     org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
>>
>>         at
>>     org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
>>
>>         at
>>     org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
>>
>>         at
>>     org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
>>
>>         at
>>     org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
>>
>>         at
>>     org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
>>
>>         at
>>     org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____
>>
>>         at
>>     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>>
>>         at
>>     org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
>>
>>         at
>>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>>
>>         at java.lang.Thread.run(Thread.java:748) ____
>>
>>     __ __
>>
>>     I’ll be interested to hear some theories on the
>>     differences/similarities in the stacks.  And lastly, I tried
>>     adding the following deprecated option (with and without the
>>     withNoSpilling() option):____
>>
>>     __ __
>>
>>     write = write.withIgnoreWindowing();____
>>
>>     __ __
>>
>>     This seemed to fix the issue altogether but aside from having to
>>     rely on a deprecated feature, there is the bigger issue of why?____
>>
>>     __ __
>>
>>     In reading through some of the source, it seems a common pattern
>>     to have to manually register the pipeline options to seed the
>>     filesystem registry during the setup part of the operator
>>     lifecycle, e.g.:
>>     https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313
>>     <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=TWbNuwGSmMMscZIW0Iwo8MTZwxipBZMF4Zb0oyHd0do%3D&reserved=0>
>>     ____
>>
>>     __ __
>>
>>     Is it possible that I have hit upon a couple scenarios where that
>>     has not taken place?  Unfortunately, I’m not yet at a position to
>>     suggest a fix, but I’m guessing there’s some missing
>>     initialization code in one or more of the batch operators.  If
>>     this is indeed a legitimate issue, I’ll be happy to log an issue,
>>     but I’ll hold off until the community gets a chance to look at it.____
>>
>>     __ __
>>
>>     Thanks,____
>>
>>       * Preston ____
>>
>>     CONFIDENTIALITY NOTICE This message and any included attachments
>>     are from Cerner Corporation and are intended only for the
>>     addressee. The information contained in this message is
>>     confidential and may constitute inside or non-public information
>>     under international, federal, or state securities laws.
>>     Unauthorized forwarding, printing, copying, distribution, or use
>>     of such information is strictly prohibited and may be unlawful. If
>>     you are not the addressee, please promptly delete this message and
>>     notify the sender of the delivery error by e-mail or you may call
>>     Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
>>     (816)221-1024 <tel:(816)%20221-1024>.
>>

Re: No filesystem found for scheme s3 using FileIO

Posted by Chamikara Jayalath <ch...@google.com>.
As Magnus mentioned, FileSystems are picked up from the class path and
registered here.
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L480

Seems like Flink is invoking this method at following locations.
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java#L142
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L63

I'm not too familiar about Flink sure why S3 is not properly being
registered when running the Flink job. Ccing some folks who are more
familiar about Flink.

+Ankur Goenka <go...@google.com> +Maximilian Michels <mx...@apache.org>

Thanks,
Cham


On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake <
Preston.B.Koprivica@cerner.com> wrote:

> Thanks for the reply Magnus.
>
> I'm sorry it wasn't more clear in the original message.  I have added the
> aws dependencies and set up the pipeline options with the aws options.
> For the case where I set the write to ignore windowing, everything works.
> But the option is deprecated and the comments warn against its usage.
>
> I'm wondering if where no options are set and I see the error that that is
> a case of improperly initialized filesystems in the flink runner.   Or
> maybe someone has some different ideas for the culprit.
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
> ------------------------------
> *From:* Magnus Runesson <ma...@linuxalert.org>
> *Sent:* Saturday, September 21, 2019 9:06:03 AM
> *To:* user@beam.apache.org <us...@beam.apache.org>
> *Subject:* Re: No filesystem found for scheme s3 using FileIO
>
>
> Hi!
>
>
> You probably miss the S3 filesystem in your classpath.
>
> If I remember correctly you must include this
> https://search.maven.org/search?q=a:beam-sdks-java-io-amazon-web-services
> <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=ahe8GyTjSswDDCXoOimmVx5u%2FckkbGY4gTS4ZVSRv8Q%3D&reserved=0>
> package in your classpath/fat-jar.
>
> /Magnus
> On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
>
> Hello everyone. I’m getting the following error when attempting to use the
> FileIO apis (beam-2.15.0) and integrating with a 3rd party filesystem, in
> this case AWS S3:
>
>
>
> java.lang.IllegalArgumentException: No filesystem found for scheme s3
>
>     at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>
>     at
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>
>     at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>
>     at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>
>     at
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>
>     at
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>
>     at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>
>     at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>
>     at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>
>     at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>
>     at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>
>     at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>
>     at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>
>     at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>
>     at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>
>     at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>
>     at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>     at java.lang.Thread.run(Thread.java:748)
>
>
>
> For reference, the write code resembles this:
>
>
>
> FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
>
>                 .via(ParquetIO.sink(schema))
>
>                 .to(options.getOutputDir()). // will be something like:
> s3://<bucket>/<path>
>
>                 .withSuffix(".parquet");
>
>
>
> records.apply(String.format("Write(%s)", options.getOutputDir()), write);
>
>
>
> I have setup the PipelineOptions with all the relevant AWS options and the
> issue does not appear to be related to ParquetIO.sink() directly.  I am
> able to reliably reproduce the issue using JSON formatted records and
> TextIO.sink(), as well.
>
>
>
> Just trying some different knobs, I went ahead and set the following
> option:
>
>
>
>         write = write.withNoSpilling();
>
>
>
> This actually seemed to fix the issue, only to have it reemerge as I
> scaled up the data set size.  The stack trace, while very similar, reads:
>
>
>
> java.lang.IllegalArgumentException: No filesystem found for scheme s3
>
>     at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>
>     at
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>
>     at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>
>     at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>
>     at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>
>     at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>
>     at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>
>     at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>
>     at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>
>     at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>
>     at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>
>     at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>
>     at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>
>     at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
>
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>
>     at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>     at java.lang.Thread.run(Thread.java:748)
>
>
>
> I’ll be interested to hear some theories on the differences/similarities
> in the stacks.  And lastly, I tried adding the following deprecated option
> (with and without the withNoSpilling() option):
>
>
>
> write = write.withIgnoreWindowing();
>
>
>
> This seemed to fix the issue altogether but aside from having to rely on a
> deprecated feature, there is the bigger issue of why?
>
>
>
> In reading through some of the source, it seems a common pattern to have
> to manually register the pipeline options to seed the filesystem registry
> during the setup part of the operator lifecycle, e.g.:
> https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313
> <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=TWbNuwGSmMMscZIW0Iwo8MTZwxipBZMF4Zb0oyHd0do%3D&reserved=0>
>
>
>
>
> Is it possible that I have hit upon a couple scenarios where that has not
> taken place?  Unfortunately, I’m not yet at a position to suggest a fix,
> but I’m guessing there’s some missing initialization code in one or more of
> the batch operators.  If this is indeed a legitimate issue, I’ll be happy
> to log an issue, but I’ll hold off until the community gets a chance to
> look at it.
>
>
>
> Thanks,
>
>    - Preston
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024 <(816)%20221-1024>.
>
>

Re: No filesystem found for scheme s3 using FileIO

Posted by "Koprivica,Preston Blake" <Pr...@cerner.com>.
Thanks for the reply Magnus.

I'm sorry it wasn't more clear in the original message.  I have added the aws dependencies and set up the pipeline options with the aws options.   For the case where I set the write to ignore windowing, everything works.  But the option is deprecated and the comments warn against its usage.

I'm wondering if where no options are set and I see the error that that is a case of improperly initialized filesystems in the flink runner.   Or maybe someone has some different ideas for the culprit.

Get Outlook for Android<https://aka.ms/ghei36>

________________________________
From: Magnus Runesson <ma...@linuxalert.org>
Sent: Saturday, September 21, 2019 9:06:03 AM
To: user@beam.apache.org <us...@beam.apache.org>
Subject: Re: No filesystem found for scheme s3 using FileIO


Hi!


You probably miss the S3 filesystem in your classpath.

If I remember correctly you must include this https://search.maven.org/search?q=a:beam-sdks-java-io-amazon-web-services<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=ahe8GyTjSswDDCXoOimmVx5u%2FckkbGY4gTS4ZVSRv8Q%3D&reserved=0> package in your classpath/fat-jar.

/Magnus

On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
Hello everyone. I’m getting the following error when attempting to use the FileIO apis (beam-2.15.0) and integrating with a 3rd party filesystem, in this case AWS S3:

java.lang.IllegalArgumentException: No filesystem found for scheme s3
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
    at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
    at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

For reference, the write code resembles this:

FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(schema))
                .to(options.getOutputDir()). // will be something like: s3://<bucket>/<path>
                .withSuffix(".parquet");

records.apply(String.format("Write(%s)", options.getOutputDir()), write);

I have setup the PipelineOptions with all the relevant AWS options and the issue does not appear to be related to ParquetIO.sink() directly.  I am able to reliably reproduce the issue using JSON formatted records and TextIO.sink(), as well.

Just trying some different knobs, I went ahead and set the following option:

        write = write.withNoSpilling();

This actually seemed to fix the issue, only to have it reemerge as I scaled up the data set size.  The stack trace, while very similar, reads:

java.lang.IllegalArgumentException: No filesystem found for scheme s3
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
    at org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
    at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
    at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
    at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

I’ll be interested to hear some theories on the differences/similarities in the stacks.  And lastly, I tried adding the following deprecated option (with and without the withNoSpilling() option):

write = write.withIgnoreWindowing();

This seemed to fix the issue altogether but aside from having to rely on a deprecated feature, there is the bigger issue of why?

In reading through some of the source, it seems a common pattern to have to manually register the pipeline options to seed the filesystem registry during the setup part of the operator lifecycle, e.g.: https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=TWbNuwGSmMMscZIW0Iwo8MTZwxipBZMF4Zb0oyHd0do%3D&reserved=0>

Is it possible that I have hit upon a couple scenarios where that has not taken place?  Unfortunately, I’m not yet at a position to suggest a fix, but I’m guessing there’s some missing initialization code in one or more of the batch operators.  If this is indeed a legitimate issue, I’ll be happy to log an issue, but I’ll hold off until the community gets a chance to look at it.

Thanks,

  *   Preston



CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.

Re: No filesystem found for scheme s3 using FileIO

Posted by Magnus Runesson <ma...@linuxalert.org>.
Hi!


You probably miss the S3 filesystem in your classpath.

If I remember correctly you must include this 
https://search.maven.org/search?q=a:beam-sdks-java-io-amazon-web-services 
package in your classpath/fat-jar.

/Magnus

On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
>
> Hello everyone. I’m getting the following error when attempting to use 
> the FileIO apis (beam-2.15.0) and integrating with a 3rd party 
> filesystem, in this case AWS S3:
>
> java.lang.IllegalArgumentException: No filesystem found for scheme s3
>
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
>
>     at 
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
>
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>
>     at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
>
>     at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>
>     at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>     at java.lang.Thread.run(Thread.java:748)
>
> For reference, the write code resembles this:
>
> FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
>
> .via(ParquetIO.sink(schema))
>
> .to(options.getOutputDir()). // will be something like: 
> s3://<bucket>/<path>
>
> .withSuffix(".parquet");
>
> records.apply(String.format("Write(%s)", options.getOutputDir()), write);
>
> I have setup the PipelineOptions with all the relevant AWS options and 
> the issue does not appear to be related to ParquetIO.sink() directly.  
> I am able to reliably reproduce the issue using JSON formatted records 
> and TextIO.sink(), as well.
>
> Just trying some different knobs, I went ahead and set the following 
> option:
>
>         write = write.withNoSpilling();
>
> This actually seemed to fix the issue, only to have it reemerge as I 
> scaled up the data set size.  The stack trace, while very similar, reads:
>
> java.lang.IllegalArgumentException: No filesystem found for scheme s3
>
>     at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>
>     at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
>
>     at 
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
>
>     at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>
>     at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>
>     at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>
>     at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
>
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
>
>     at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>
>     at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>
>     at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>
>     at 
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
>
>     at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>
>     at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>     at java.lang.Thread.run(Thread.java:748)
>
> I’ll be interested to hear some theories on the 
> differences/similarities in the stacks.  And lastly, I tried adding 
> the following deprecated option (with and without the withNoSpilling() 
> option):
>
> write = write.withIgnoreWindowing();
>
> This seemed to fix the issue altogether but aside from having to rely 
> on a deprecated feature, there is the bigger issue of why?
>
> In reading through some of the source, it seems a common pattern to 
> have to manually register the pipeline options to seed the filesystem 
> registry during the setup part of the operator lifecycle, e.g.: 
> https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313 
>
>
> Is it possible that I have hit upon a couple scenarios where that has 
> not taken place?  Unfortunately, I’m not yet at a position to suggest 
> a fix, but I’m guessing there’s some missing initialization code in 
> one or more of the batch operators.  If this is indeed a legitimate 
> issue, I’ll be happy to log an issue, but I’ll hold off until the 
> community gets a chance to look at it.
>
> Thanks,
>
>   * Preston
>
> CONFIDENTIALITY NOTICE This message and any included attachments are 
> from Cerner Corporation and are intended only for the addressee. The 
> information contained in this message is confidential and may 
> constitute inside or non-public information under international, 
> federal, or state securities laws. Unauthorized forwarding, printing, 
> copying, distribution, or use of such information is strictly 
> prohibited and may be unlawful. If you are not the addressee, please 
> promptly delete this message and notify the sender of the delivery 
> error by e-mail or you may call Cerner's corporate offices in Kansas 
> City, Missouri, U.S.A at (+1) (816)221-1024.
>