You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Maulik Gandhi <mm...@gmail.com> on 2019/12/11 20:59:34 UTC

Re: No filesystem found for scheme s3 using FileIO

Hi Beam Team,

I have a data pipeline (Beam on Flink running on YARN on AWS EMR), which
reads some data and does a simple filtering operation and writes the data
to data source S3.

*Components and Versions:*
- Beam: 2.16.0 (branch: release-2.16.0)
- Flink: 1.8
- YARN on AWS EMR: emr-5.26.0

Below is a snippet of code

PCollection<SomeType> someTypes = pipeline.apply(new ReadLatestSomeType());
PCollection<SomeTypeValue> someTypesOutput =
    someTypes.apply(
        Filter.by(
            someTypeElement -> {
              if (some condition) {
                return false;
              }
              return true;
            }));

someTypesOutput
    .apply(
        AvroIO.write(SomeType.class).to(options.getDestination().get()).withOutputFilenames())
    .getPerDestinationOutputFilenames();


Below is the exception, I see on Flink job

java.lang.IllegalArgumentException: No filesystem found for scheme s3
	at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
	at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
	at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
	at org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
	at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
	at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
	at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:106)


My Beam pipeline is failing randomly with the exception I have listed
above.  I saw that the fix was made available on the branch: release-2.16.0
(comment on JIRA: https://issues.apache.org/jira/browse/BEAM-8303) which I
have checked out locally.  I did a custom build of the Apache/Beam
(branch: release-2.16.0) and made sure the artifacts were published to the
local maven repository so that I can use the artifacts on my project.  I
imported Beam artifacts (2.16.0-MG-SNAPSHOT) in my project and build my
project.  Once I had the compiled JAR, I kicked off Flink jobs on an AWS
EMR cluster.  The Flink job will give fail/success randomly
(non-deterministic), without any change in the way I run the command, or
re-building my artifact.

I have noticed if I run the Flink Job with the parallelism of 1, it will
not fail, but if I run the same job with the parallelism of 5 it can fail
or succeed.

If anyone can please help me out or give me directions, I could try out, it
will be greatly appreciated.

Thanks.
- Maulik

On Wed, Sep 25, 2019 at 10:53 AM Koprivica,Preston Blake <
Preston.B.Koprivica@cerner.com> wrote:

> Not a problem!  Thanks for looking into this.  In reading through the
> source associated with the stacktrace, I also noticed that there's neither
> user-code, nor beam-to-flink lifecycle code available for initialization.
> As far as I could tell, it was pure flink down to the coders.   Nothing new
> here, but maybe it bolsters confidence in your diagnosis.   I went ahead
> and logged an issue here: https://issues.apache.org/jira/browse/BEAM-8303.
>
> Let me know what I can do to help - I'm happy to test/verify any fixes you
> want to try and review any code (bearing in mind I'm a total newb in the
> beam space).
>
> Thanks again,
> Preston
>
> On 9/25/19, 10:34 AM, "Maximilian Michels" <mx...@apache.org> wrote:
>
>     Hi Preston,
>
>     Sorry about the name mixup, of course I meant to write Preston not
>     Magnus :) See my reply below.
>
>     cheers,
>     Max
>
>     On 25.09.19 08:31, Maximilian Michels wrote:
>     > Hi Magnus,
>     >
>     > Your observation seems to be correct. There is an issue with the file
>     > system registration.
>     >
>     > The two types of errors you are seeing, as well as the successful
> run,
>     > are just due to the different structure of the generated transforms.
> The
>     > Flink scheduler will distribute them differently, which results in
> some
>     > pipelines being placed on task managers which happen to execute the
>     > FileSystems initialization code and others not.
>     >
>     > There is a quick fix to at least initialize the file system in case
> it
>     > has not been initialized, by adding the loading code here:
>     >
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F948c6fae909685e09d36b23be643182b34c8df25%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L463&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=n9zr0jMao6oGcSK0G3QSPcHbcfdZlUAGwKhWCpdKT6Y%3D&amp;reserved=0
>     >
>     >
>     > However, there we do not have the pipeline options available, which
>     > prevents any configuration. The problem is that the error occurs in
> the
>     > coder used in a native Flink operation which does not even run user
> code.
>     >
>     > I believe the only way fix this is to ship the FileSystems
>     > initialization code in CoderTypeSerializer where we are sure to
> execute
>     > it in time for any coders which depend on it.
>     >
>     > Could you file an issue? I'd be happy to fix this then.
>     >
>     > Thanks,
>     > Max
>     >
>     > On 24.09.19 09:54, Chamikara Jayalath wrote:
>     >> As Magnus mentioned, FileSystems are picked up from the class path
> and
>     >> registered here.
>     >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L480&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=RPVRS548zo%2FWdf672K%2FKbqJykjf%2FuWyS84CirzR4b0E%3D&amp;reserved=0
>     >>
>     >>
>     >> Seems like Flink is invoking this method at following locations.
>     >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkPipelineRunner.java%23L142&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=jiMBDlzvSX2rLY9tZOLtt6QKAmCPTiglWezLKymxJAc%3D&amp;reserved=0
>     >>
>     >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkJobServerDriver.java%23L63&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=7hBfpu2mrAltIFCiq98hx5kp%2BLN8XYnavbU%2FKzHudnI%3D&amp;reserved=0
>     >>
>     >>
>     >> I'm not too familiar about Flink sure why S3 is not properly being
>     >> registered when running the Flink job. Ccing some folks who are more
>     >> familiar about Flink.
>     >>
>     >> +Ankur Goenka <ma...@google.com> +Maximilian Michels
>     >> <ma...@apache.org>
>     >>
>     >> Thanks,
>     >> Cham
>     >>
>     >>
>     >> On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake
>     >> <Preston.B.Koprivica@cerner.com
>     >> <ma...@cerner.com>> wrote:
>     >>
>     >>     Thanks for the reply Magnus.
>     >>
>     >>     I'm sorry it wasn't more clear in the original message.  I have
>     >>     added the aws dependencies and set up the pipeline options with
> the
>     >>     aws options.   For the case where I set the write to ignore
>     >>     windowing, everything works.  But the option is deprecated and
> the
>     >>     comments warn against its usage.
>     >>
>     >>     I'm wondering if where no options are set and I see the error
> that
>     >>     that is a case of improperly initialized filesystems in the
> flink
>     >>     runner.   Or maybe someone has some different ideas for the
> culprit.
>     >>
>     >>     Get Outlook for Android <
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Faka.ms%2Fghei36&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=AGwyaD%2Bn8MQJlHqYjsefmUDcrgMifcFUVx%2BR%2BOty5Xo%3D&amp;reserved=0
> >
>     >>
>     >>
>     >>
> ------------------------------------------------------------------------
>     >>     *From:* Magnus Runesson <magru@linuxalert.org
>     >>     <ma...@linuxalert.org>>
>     >>     *Sent:* Saturday, September 21, 2019 9:06:03 AM
>     >>     *To:* user@beam.apache.org <ma...@beam.apache.org>
>     >>     <user@beam.apache.org <ma...@beam.apache.org>>
>     >>     *Subject:* Re: No filesystem found for scheme s3 using FileIO
>     >>
>     >>     Hi!
>     >>
>     >>
>     >>     You probably miss the S3 filesystem in your classpath.
>     >>
>     >>     If I remember correctly you must include this
>     >>
>     >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=K%2FGNviln25aCjv1biaVakcPpvGEgmAzB%2FggezNGiSOo%3D&amp;reserved=0
>     >>
>     >> <
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=ZLJ7QmRWC8XkouQ7r47Ze%2Fy2pxN%2Bcttoi70ilOsJNG0%3D&amp;reserved=0
> >
>     >>
>     >>     package in your classpath/fat-jar.
>     >>
>     >>     /Magnus
>     >>
>     >>     On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
>     >>>
>     >>>     Hello everyone. I’m getting the following error when
> attempting to
>     >>>     use the FileIO apis (beam-2.15.0) and integrating with a 3rd
> party
>     >>>     filesystem, in this case AWS S3:____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     java.lang.IllegalArgumentException: No filesystem found for
> scheme
>     >>>     s3____
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:456)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:526)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
>     >>>
>     >>>
>     >>>         at
> org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.flink.runtime.io
> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.flink.runtime.io
> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>  org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>     >>>
>     >>>         at java.lang.Thread.run(Thread.java:748)____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     For reference, the write code resembles this:____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     FileIO.Write<?, GenericRecord> write =
>     >>>     FileIO.<GenericRecord>write()____
>     >>>
>     >>>                     .via(ParquetIO.sink(schema))____
>     >>>
>     >>>                     .to(options.getOutputDir()). // will be
> something
>     >>>     like: s3://<bucket>/<path>____
>     >>>
>     >>>                     .withSuffix(".parquet");____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     records.apply(String.format("Write(%s)",
> options.getOutputDir()),
>     >>>     write);____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     I have setup the PipelineOptions with all the relevant AWS
> options
>     >>>     and the issue does not appear to be related to ParquetIO.sink()
>     >>>     directly.  I am able to reliably reproduce the issue using JSON
>     >>>     formatted records and TextIO.sink(), as well.____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     Just trying some different knobs, I went ahead and set the
>     >>>     following option:____
>     >>>
>     >>>     __ __
>     >>>
>     >>>             write = write.withNoSpilling();____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     This actually seemed to fix the issue, only to have it
> reemerge as
>     >>>     I scaled up the data set size.  The stack trace, while very
>     >>>     similar, reads:____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     java.lang.IllegalArgumentException: No filesystem found for
> scheme
>     >>>     s3____
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:456)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:526)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.beam.sdk.io
> .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
>     >>>
>     >>>
>     >>>         at
> org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>     >>>
>     >>>         at
>     >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____
>     >>>
>     >>>         at
>     >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.flink.runtime.io
> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>> org.apache.flink.runtime.io
> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>     >>>
>     >>>         at
>     >>>
>     >>>
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
>     >>>
>     >>>
>     >>>         at
>     >>>
>  org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>     >>>
>     >>>         at java.lang.Thread.run(Thread.java:748) ____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     I’ll be interested to hear some theories on the
>     >>>     differences/similarities in the stacks.  And lastly, I tried
>     >>>     adding the following deprecated option (with and without the
>     >>>     withNoSpilling() option):____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     write = write.withIgnoreWindowing();____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     This seemed to fix the issue altogether but aside from having
> to
>     >>>     rely on a deprecated feature, there is the bigger issue of
> why?____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     In reading through some of the source, it seems a common
> pattern
>     >>>     to have to manually register the pipeline options to seed the
>     >>>     filesystem registry during the setup part of the operator
>     >>>     lifecycle, e.g.:
>     >>>
>     >>>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&amp;reserved=0
>     >>>
>     >>>
>     >>> <
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&amp;reserved=0
> >
>     >>>
>     >>>     ____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     Is it possible that I have hit upon a couple scenarios where
> that
>     >>>     has not taken place?  Unfortunately, I’m not yet at a position
> to
>     >>>     suggest a fix, but I’m guessing there’s some missing
>     >>>     initialization code in one or more of the batch operators.  If
>     >>>     this is indeed a legitimate issue, I’ll be happy to log an
> issue,
>     >>>     but I’ll hold off until the community gets a chance to look at
>     >>> it.____
>     >>>
>     >>>     __ __
>     >>>
>     >>>     Thanks,____
>     >>>
>     >>>       * Preston ____
>     >>>
>     >>>     CONFIDENTIALITY NOTICE This message and any included
> attachments
>     >>>     are from Cerner Corporation and are intended only for the
>     >>>     addressee. The information contained in this message is
>     >>>     confidential and may constitute inside or non-public
> information
>     >>>     under international, federal, or state securities laws.
>     >>>     Unauthorized forwarding, printing, copying, distribution, or
> use
>     >>>     of such information is strictly prohibited and may be
> unlawful. If
>     >>>     you are not the addressee, please promptly delete this message
> and
>     >>>     notify the sender of the delivery error by e-mail or you may
> call
>     >>>     Cerner's corporate offices in Kansas City, Missouri, U.S.A at
> (+1)
>     >>>     (816)221-1024 <tel:(816)%20221-1024>.
>     >>>
>
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>

Re: No filesystem found for scheme s3 using FileIO

Posted by Maximilian Michels <mx...@apache.org>.
Hi Maulik,

Thanks for reporting.

Yes, there is still a chance this bug will occur in batch mode. David 
(CC) has filed an issue for this: 
https://issues.apache.org/jira/browse/BEAM-8577

The current approach to let every operator ensure FileSystems is 
initialized does not always work if we have a shuffle operation. The 
reason is that the receiving layer may attempt to decode data using a 
coder which relies on FileSystems _before_ the following operator has 
been initialized.

I think we have to go with a fix that initializes FileSystems in the 
coder wrapper (CoderTypeInformation). Perhaps we can liven up the 
current PR: https://github.com/apache/beam/pull/10027

Cheers,
Max

On 11.12.19 21:59, Maulik Gandhi wrote:
> Hi Beam Team,
> 
> I have a data pipeline (Beam on Flink running on YARN on AWS EMR), which 
> reads some data and does a simple filtering operation and writes the 
> data to data source S3.
> 
> *Components and Versions:*
> - Beam: 2.16.0 (branch: release-2.16.0)
> - Flink: 1.8
> - YARN on AWS EMR: emr-5.26.0
> 
> Below is a snippet of code
> 
> PCollection<SomeType> someTypes = pipeline.apply(new  ReadLatestSomeType());
> PCollection<SomeTypeValue> someTypesOutput =
>      someTypes.apply(
>          Filter.by(
>              someTypeElement -> {
>                if  (some condition) {
>                  return  false;
>                }
>                return  true;
>              }));
> 
> someTypesOutput
>      .apply(
>          AvroIO.write(SomeType.class).to(options.getDestination().get()).withOutputFilenames())
>      .getPerDestinationOutputFilenames();
> 
> 
> Below is the exception, I see on Flink job
> 
> java.lang.IllegalArgumentException: No filesystem foundfor  scheme s3
> 	at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
> 	at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
> 	at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
> 	at org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
> 	at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
> 	at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
> 	at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> 	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
> 	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
> 	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
> 	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
> 	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
> 	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
> 	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:106)
> 
> 
> My Beam pipeline is failing randomly with the exception I have listed 
> above.  I saw that the fix was made available on 
> the branch: release-2.16.0 (comment on JIRA: 
> https://issues.apache.org/jira/browse/BEAM-8303) which I have checked 
> out locally.  I did a custom build of the Apache/Beam 
> (branch: release-2.16.0) and made sure the artifacts were published to 
> the local maven repository so that I can use the artifacts on my 
> project.  I imported Beam artifacts (2.16.0-MG-SNAPSHOT) in my project 
> and build my project.  Once I had the compiled JAR, I kicked off Flink 
> jobs on an AWS EMR cluster.  The Flink job will give fail/success 
> randomly (non-deterministic), without any change in the way I run the 
> command, or re-building my artifact.
> 
> I have noticed if I run the Flink Job with the parallelism of 1, it will 
> not fail, but if I run the same job with the parallelism of 5 it can 
> fail or succeed.
> 
> If anyone can please help me out or give me directions, I could try out, 
> it will be greatly appreciated.
> 
> Thanks.
> - Maulik
> 
> On Wed, Sep 25, 2019 at 10:53 AM Koprivica,Preston Blake 
> <Preston.B.Koprivica@cerner.com <ma...@cerner.com>> 
> wrote:
> 
>     Not a problem!  Thanks for looking into this.  In reading through
>     the source associated with the stacktrace, I also noticed that
>     there's neither user-code, nor beam-to-flink lifecycle code
>     available for initialization. As far as I could tell, it was pure
>     flink down to the coders.   Nothing new here, but maybe it bolsters
>     confidence in your diagnosis.   I went ahead and logged an issue
>     here: https://issues.apache.org/jira/browse/BEAM-8303.
> 
>     Let me know what I can do to help - I'm happy to test/verify any
>     fixes you want to try and review any code (bearing in mind I'm a
>     total newb in the beam space).
> 
>     Thanks again,
>     Preston
> 
>     On 9/25/19, 10:34 AM, "Maximilian Michels" <mxm@apache.org
>     <ma...@apache.org>> wrote:
> 
>          Hi Preston,
> 
>          Sorry about the name mixup, of course I meant to write Preston not
>          Magnus :) See my reply below.
> 
>          cheers,
>          Max
> 
>          On 25.09.19 08:31, Maximilian Michels wrote:
>          > Hi Magnus,
>          >
>          > Your observation seems to be correct. There is an issue with
>     the file
>          > system registration.
>          >
>          > The two types of errors you are seeing, as well as the
>     successful run,
>          > are just due to the different structure of the generated
>     transforms. The
>          > Flink scheduler will distribute them differently, which
>     results in some
>          > pipelines being placed on task managers which happen to
>     execute the
>          > FileSystems initialization code and others not.
>          >
>          > There is a quick fix to at least initialize the file system
>     in case it
>          > has not been initialized, by adding the loading code here:
>          >
>     https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F948c6fae909685e09d36b23be643182b34c8df25%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L463&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=n9zr0jMao6oGcSK0G3QSPcHbcfdZlUAGwKhWCpdKT6Y%3D&amp;reserved=0
>          >
>          >
>          > However, there we do not have the pipeline options available,
>     which
>          > prevents any configuration. The problem is that the error
>     occurs in the
>          > coder used in a native Flink operation which does not even
>     run user code.
>          >
>          > I believe the only way fix this is to ship the FileSystems
>          > initialization code in CoderTypeSerializer where we are sure
>     to execute
>          > it in time for any coders which depend on it.
>          >
>          > Could you file an issue? I'd be happy to fix this then.
>          >
>          > Thanks,
>          > Max
>          >
>          > On 24.09.19 09:54, Chamikara Jayalath wrote:
>          >> As Magnus mentioned, FileSystems are picked up from the
>     class path and
>          >> registered here.
>          >>
>     https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L480&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=RPVRS548zo%2FWdf672K%2FKbqJykjf%2FuWyS84CirzR4b0E%3D&amp;reserved=0
>          >>
>          >>
>          >> Seems like Flink is invoking this method at following locations.
>          >>
>     https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkPipelineRunner.java%23L142&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=jiMBDlzvSX2rLY9tZOLtt6QKAmCPTiglWezLKymxJAc%3D&amp;reserved=0
>          >>
>          >>
>     https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkJobServerDriver.java%23L63&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=7hBfpu2mrAltIFCiq98hx5kp%2BLN8XYnavbU%2FKzHudnI%3D&amp;reserved=0
>          >>
>          >>
>          >> I'm not too familiar about Flink sure why S3 is not properly
>     being
>          >> registered when running the Flink job. Ccing some folks who
>     are more
>          >> familiar about Flink.
>          >>
>          >> +Ankur Goenka <mailto:goenka@google.com
>     <ma...@google.com>> +Maximilian Michels
>          >> <mailto:mxm@apache.org <ma...@apache.org>>
>          >>
>          >> Thanks,
>          >> Cham
>          >>
>          >>
>          >> On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake
>          >> <Preston.B.Koprivica@cerner.com
>     <ma...@cerner.com>
>          >> <mailto:Preston.B.Koprivica@cerner.com
>     <ma...@cerner.com>>> wrote:
>          >>
>          >>     Thanks for the reply Magnus.
>          >>
>          >>     I'm sorry it wasn't more clear in the original message. 
>     I have
>          >>     added the aws dependencies and set up the pipeline
>     options with the
>          >>     aws options.   For the case where I set the write to ignore
>          >>     windowing, everything works.  But the option is
>     deprecated and the
>          >>     comments warn against its usage.
>          >>
>          >>     I'm wondering if where no options are set and I see the
>     error that
>          >>     that is a case of improperly initialized filesystems in
>     the flink
>          >>     runner.   Or maybe someone has some different ideas for
>     the culprit.
>          >>
>          >>     Get Outlook for Android
>     <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Faka.ms%2Fghei36&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=AGwyaD%2Bn8MQJlHqYjsefmUDcrgMifcFUVx%2BR%2BOty5Xo%3D&amp;reserved=0>
>          >>
>          >>
>          >>
>     ------------------------------------------------------------------------
>          >>     *From:* Magnus Runesson <magru@linuxalert.org
>     <ma...@linuxalert.org>
>          >>     <mailto:magru@linuxalert.org <ma...@linuxalert.org>>>
>          >>     *Sent:* Saturday, September 21, 2019 9:06:03 AM
>          >>     *To:* user@beam.apache.org <ma...@beam.apache.org>
>     <mailto:user@beam.apache.org <ma...@beam.apache.org>>
>          >>     <user@beam.apache.org <ma...@beam.apache.org>
>     <mailto:user@beam.apache.org <ma...@beam.apache.org>>>
>          >>     *Subject:* Re: No filesystem found for scheme s3 using
>     FileIO
>          >>
>          >>     Hi!
>          >>
>          >>
>          >>     You probably miss the S3 filesystem in your classpath.
>          >>
>          >>     If I remember correctly you must include this
>          >>
>          >>
>     https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&amp;sdata=K%2FGNviln25aCjv1biaVakcPpvGEgmAzB%2FggezNGiSOo%3D&amp;reserved=0
>          >>
>          >>
>     <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=ZLJ7QmRWC8XkouQ7r47Ze%2Fy2pxN%2Bcttoi70ilOsJNG0%3D&amp;reserved=0>
>          >>
>          >>     package in your classpath/fat-jar.
>          >>
>          >>     /Magnus
>          >>
>          >>     On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
>          >>>
>          >>>     Hello everyone. I’m getting the following error when
>     attempting to
>          >>>     use the FileIO apis (beam-2.15.0) and integrating with
>     a 3rd party
>          >>>     filesystem, in this case AWS S3:____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     java.lang.IllegalArgumentException: No filesystem found
>     for scheme
>          >>>     s3____
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:456)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:526)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
>          >>>
>          >>>
>          >>>         at
>     org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
>          >>>
>          >>>
>          >>>         at
>          >>>   
>       org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>          >>>
>          >>>         at java.lang.Thread.run(Thread.java:748)____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     For reference, the write code resembles this:____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     FileIO.Write<?, GenericRecord> write =
>          >>>     FileIO.<GenericRecord>write()____
>          >>>
>          >>>                     .via(ParquetIO.sink(schema))____
>          >>>
>          >>>                     .to(options.getOutputDir()). // will be
>     something
>          >>>     like: s3://<bucket>/<path>____
>          >>>
>          >>>                     .withSuffix(".parquet");____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     records.apply(String.format("Write(%s)",
>     options.getOutputDir()),
>          >>>     write);____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     I have setup the PipelineOptions with all the relevant
>     AWS options
>          >>>     and the issue does not appear to be related to
>     ParquetIO.sink()
>          >>>     directly.  I am able to reliably reproduce the issue
>     using JSON
>          >>>     formatted records and TextIO.sink(), as well.____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     Just trying some different knobs, I went ahead and set the
>          >>>     following option:____
>          >>>
>          >>>     __ __
>          >>>
>          >>>             write = write.withNoSpilling();____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     This actually seemed to fix the issue, only to have it
>     reemerge as
>          >>>     I scaled up the data set size.  The stack trace, while very
>          >>>     similar, reads:____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     java.lang.IllegalArgumentException: No filesystem found
>     for scheme
>          >>>     s3____
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:456)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:526)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
>          >>>
>          >>>
>          >>>         at
>     org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
>          >>>
>          >>>         at
>          >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____
>          >>>
>          >>>         at
>          >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____
>          >>>
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
>          >>>
>          >>>         at
>          >>>
>          >>>
>     org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
>          >>>
>          >>>
>          >>>         at
>          >>>   
>       org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
>          >>>
>          >>>         at java.lang.Thread.run(Thread.java:748) ____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     I’ll be interested to hear some theories on the
>          >>>     differences/similarities in the stacks.  And lastly, I
>     tried
>          >>>     adding the following deprecated option (with and
>     without the
>          >>>     withNoSpilling() option):____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     write = write.withIgnoreWindowing();____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     This seemed to fix the issue altogether but aside from
>     having to
>          >>>     rely on a deprecated feature, there is the bigger issue
>     of why?____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     In reading through some of the source, it seems a
>     common pattern
>          >>>     to have to manually register the pipeline options to
>     seed the
>          >>>     filesystem registry during the setup part of the operator
>          >>>     lifecycle, e.g.:
>          >>>
>          >>>
>     https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&amp;reserved=0
>          >>>
>          >>>
>          >>>
>     <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&amp;data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&amp;sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&amp;reserved=0>
>          >>>
>          >>>     ____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     Is it possible that I have hit upon a couple scenarios
>     where that
>          >>>     has not taken place?  Unfortunately, I’m not yet at a
>     position to
>          >>>     suggest a fix, but I’m guessing there’s some missing
>          >>>     initialization code in one or more of the batch
>     operators.  If
>          >>>     this is indeed a legitimate issue, I’ll be happy to log
>     an issue,
>          >>>     but I’ll hold off until the community gets a chance to
>     look at
>          >>> it.____
>          >>>
>          >>>     __ __
>          >>>
>          >>>     Thanks,____
>          >>>
>          >>>       * Preston ____
>          >>>
>          >>>     CONFIDENTIALITY NOTICE This message and any included
>     attachments
>          >>>     are from Cerner Corporation and are intended only for the
>          >>>     addressee. The information contained in this message is
>          >>>     confidential and may constitute inside or non-public
>     information
>          >>>     under international, federal, or state securities laws.
>          >>>     Unauthorized forwarding, printing, copying,
>     distribution, or use
>          >>>     of such information is strictly prohibited and may be
>     unlawful. If
>          >>>     you are not the addressee, please promptly delete this
>     message and
>          >>>     notify the sender of the delivery error by e-mail or
>     you may call
>          >>>     Cerner's corporate offices in Kansas City, Missouri,
>     U.S.A at (+1)
>          >>>     (816)221-1024 <tel:(816)%20221-1024>.
>          >>>
> 
> 
> 
> 
>     CONFIDENTIALITY NOTICE This message and any included attachments are
>     from Cerner Corporation and are intended only for the addressee. The
>     information contained in this message is confidential and may
>     constitute inside or non-public information under international,
>     federal, or state securities laws. Unauthorized forwarding,
>     printing, copying, distribution, or use of such information is
>     strictly prohibited and may be unlawful. If you are not the
>     addressee, please promptly delete this message and notify the sender
>     of the delivery error by e-mail or you may call Cerner's corporate
>     offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>