You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Maulik Gandhi <mm...@gmail.com> on 2019/12/11 20:59:34 UTC
Re: No filesystem found for scheme s3 using FileIO
Hi Beam Team,
I have a data pipeline (Beam on Flink running on YARN on AWS EMR), which
reads some data and does a simple filtering operation and writes the data
to data source S3.
*Components and Versions:*
- Beam: 2.16.0 (branch: release-2.16.0)
- Flink: 1.8
- YARN on AWS EMR: emr-5.26.0
Below is a snippet of code
PCollection<SomeType> someTypes = pipeline.apply(new ReadLatestSomeType());
PCollection<SomeTypeValue> someTypesOutput =
someTypes.apply(
Filter.by(
someTypeElement -> {
if (some condition) {
return false;
}
return true;
}));
someTypesOutput
.apply(
AvroIO.write(SomeType.class).to(options.getDestination().get()).withOutputFilenames())
.getPerDestinationOutputFilenames();
Below is the exception, I see on Flink job
java.lang.IllegalArgumentException: No filesystem found for scheme s3
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
at org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:106)
My Beam pipeline is failing randomly with the exception I have listed
above. I saw that the fix was made available on the branch: release-2.16.0
(comment on JIRA: https://issues.apache.org/jira/browse/BEAM-8303) which I
have checked out locally. I did a custom build of the Apache/Beam
(branch: release-2.16.0) and made sure the artifacts were published to the
local maven repository so that I can use the artifacts on my project. I
imported Beam artifacts (2.16.0-MG-SNAPSHOT) in my project and build my
project. Once I had the compiled JAR, I kicked off Flink jobs on an AWS
EMR cluster. The Flink job will give fail/success randomly
(non-deterministic), without any change in the way I run the command, or
re-building my artifact.
I have noticed if I run the Flink Job with the parallelism of 1, it will
not fail, but if I run the same job with the parallelism of 5 it can fail
or succeed.
If anyone can please help me out or give me directions, I could try out, it
will be greatly appreciated.
Thanks.
- Maulik
On Wed, Sep 25, 2019 at 10:53 AM Koprivica,Preston Blake <
Preston.B.Koprivica@cerner.com> wrote:
> Not a problem! Thanks for looking into this. In reading through the
> source associated with the stacktrace, I also noticed that there's neither
> user-code, nor beam-to-flink lifecycle code available for initialization.
> As far as I could tell, it was pure flink down to the coders. Nothing new
> here, but maybe it bolsters confidence in your diagnosis. I went ahead
> and logged an issue here: https://issues.apache.org/jira/browse/BEAM-8303.
>
> Let me know what I can do to help - I'm happy to test/verify any fixes you
> want to try and review any code (bearing in mind I'm a total newb in the
> beam space).
>
> Thanks again,
> Preston
>
> On 9/25/19, 10:34 AM, "Maximilian Michels" <mx...@apache.org> wrote:
>
> Hi Preston,
>
> Sorry about the name mixup, of course I meant to write Preston not
> Magnus :) See my reply below.
>
> cheers,
> Max
>
> On 25.09.19 08:31, Maximilian Michels wrote:
> > Hi Magnus,
> >
> > Your observation seems to be correct. There is an issue with the file
> > system registration.
> >
> > The two types of errors you are seeing, as well as the successful
> run,
> > are just due to the different structure of the generated transforms.
> The
> > Flink scheduler will distribute them differently, which results in
> some
> > pipelines being placed on task managers which happen to execute the
> > FileSystems initialization code and others not.
> >
> > There is a quick fix to at least initialize the file system in case
> it
> > has not been initialized, by adding the loading code here:
> >
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F948c6fae909685e09d36b23be643182b34c8df25%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L463&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=n9zr0jMao6oGcSK0G3QSPcHbcfdZlUAGwKhWCpdKT6Y%3D&reserved=0
> >
> >
> > However, there we do not have the pipeline options available, which
> > prevents any configuration. The problem is that the error occurs in
> the
> > coder used in a native Flink operation which does not even run user
> code.
> >
> > I believe the only way fix this is to ship the FileSystems
> > initialization code in CoderTypeSerializer where we are sure to
> execute
> > it in time for any coders which depend on it.
> >
> > Could you file an issue? I'd be happy to fix this then.
> >
> > Thanks,
> > Max
> >
> > On 24.09.19 09:54, Chamikara Jayalath wrote:
> >> As Magnus mentioned, FileSystems are picked up from the class path
> and
> >> registered here.
> >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L480&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=RPVRS548zo%2FWdf672K%2FKbqJykjf%2FuWyS84CirzR4b0E%3D&reserved=0
> >>
> >>
> >> Seems like Flink is invoking this method at following locations.
> >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkPipelineRunner.java%23L142&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=jiMBDlzvSX2rLY9tZOLtt6QKAmCPTiglWezLKymxJAc%3D&reserved=0
> >>
> >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkJobServerDriver.java%23L63&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=7hBfpu2mrAltIFCiq98hx5kp%2BLN8XYnavbU%2FKzHudnI%3D&reserved=0
> >>
> >>
> >> I'm not too familiar about Flink sure why S3 is not properly being
> >> registered when running the Flink job. Ccing some folks who are more
> >> familiar about Flink.
> >>
> >> +Ankur Goenka <ma...@google.com> +Maximilian Michels
> >> <ma...@apache.org>
> >>
> >> Thanks,
> >> Cham
> >>
> >>
> >> On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake
> >> <Preston.B.Koprivica@cerner.com
> >> <ma...@cerner.com>> wrote:
> >>
> >> Thanks for the reply Magnus.
> >>
> >> I'm sorry it wasn't more clear in the original message. I have
> >> added the aws dependencies and set up the pipeline options with
> the
> >> aws options. For the case where I set the write to ignore
> >> windowing, everything works. But the option is deprecated and
> the
> >> comments warn against its usage.
> >>
> >> I'm wondering if where no options are set and I see the error
> that
> >> that is a case of improperly initialized filesystems in the
> flink
> >> runner. Or maybe someone has some different ideas for the
> culprit.
> >>
> >> Get Outlook for Android <
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Faka.ms%2Fghei36&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=AGwyaD%2Bn8MQJlHqYjsefmUDcrgMifcFUVx%2BR%2BOty5Xo%3D&reserved=0
> >
> >>
> >>
> >>
> ------------------------------------------------------------------------
> >> *From:* Magnus Runesson <magru@linuxalert.org
> >> <ma...@linuxalert.org>>
> >> *Sent:* Saturday, September 21, 2019 9:06:03 AM
> >> *To:* user@beam.apache.org <ma...@beam.apache.org>
> >> <user@beam.apache.org <ma...@beam.apache.org>>
> >> *Subject:* Re: No filesystem found for scheme s3 using FileIO
> >>
> >> Hi!
> >>
> >>
> >> You probably miss the S3 filesystem in your classpath.
> >>
> >> If I remember correctly you must include this
> >>
> >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=K%2FGNviln25aCjv1biaVakcPpvGEgmAzB%2FggezNGiSOo%3D&reserved=0
> >>
> >> <
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&sdata=ZLJ7QmRWC8XkouQ7r47Ze%2Fy2pxN%2Bcttoi70ilOsJNG0%3D&reserved=0
> >
> >>
> >> package in your classpath/fat-jar.
> >>
> >> /Magnus
> >>
> >> On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
> >>>
> >>> Hello everyone. I’m getting the following error when
> attempting to
> >>> use the FileIO apis (beam-2.15.0) and integrating with a 3rd
> party
> >>> filesystem, in this case AWS S3:____
> >>>
> >>> __ __
> >>>
> >>> java.lang.IllegalArgumentException: No filesystem found for
> scheme
> >>> s3____
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:456)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:526)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
> >>>
> >>>
> >>> at
> org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.flink.runtime.io
> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.flink.runtime.io
> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
> >>>
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
> >>>
> >>> at java.lang.Thread.run(Thread.java:748)____
> >>>
> >>> __ __
> >>>
> >>> For reference, the write code resembles this:____
> >>>
> >>> __ __
> >>>
> >>> FileIO.Write<?, GenericRecord> write =
> >>> FileIO.<GenericRecord>write()____
> >>>
> >>> .via(ParquetIO.sink(schema))____
> >>>
> >>> .to(options.getOutputDir()). // will be
> something
> >>> like: s3://<bucket>/<path>____
> >>>
> >>> .withSuffix(".parquet");____
> >>>
> >>> __ __
> >>>
> >>> records.apply(String.format("Write(%s)",
> options.getOutputDir()),
> >>> write);____
> >>>
> >>> __ __
> >>>
> >>> I have setup the PipelineOptions with all the relevant AWS
> options
> >>> and the issue does not appear to be related to ParquetIO.sink()
> >>> directly. I am able to reliably reproduce the issue using JSON
> >>> formatted records and TextIO.sink(), as well.____
> >>>
> >>> __ __
> >>>
> >>> Just trying some different knobs, I went ahead and set the
> >>> following option:____
> >>>
> >>> __ __
> >>>
> >>> write = write.withNoSpilling();____
> >>>
> >>> __ __
> >>>
> >>> This actually seemed to fix the issue, only to have it
> reemerge as
> >>> I scaled up the data set size. The stack trace, while very
> >>> similar, reads:____
> >>>
> >>> __ __
> >>>
> >>> java.lang.IllegalArgumentException: No filesystem found for
> scheme
> >>> s3____
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:456)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:526)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> .FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
> >>>
> >>>
> >>> at
> org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
> >>>
> >>> at
> >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____
> >>>
> >>> at
> >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.flink.runtime.io
> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.flink.runtime.io
> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
> >>>
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
> >>>
> >>> at java.lang.Thread.run(Thread.java:748) ____
> >>>
> >>> __ __
> >>>
> >>> I’ll be interested to hear some theories on the
> >>> differences/similarities in the stacks. And lastly, I tried
> >>> adding the following deprecated option (with and without the
> >>> withNoSpilling() option):____
> >>>
> >>> __ __
> >>>
> >>> write = write.withIgnoreWindowing();____
> >>>
> >>> __ __
> >>>
> >>> This seemed to fix the issue altogether but aside from having
> to
> >>> rely on a deprecated feature, there is the bigger issue of
> why?____
> >>>
> >>> __ __
> >>>
> >>> In reading through some of the source, it seems a common
> pattern
> >>> to have to manually register the pipeline options to seed the
> >>> filesystem registry during the setup part of the operator
> >>> lifecycle, e.g.:
> >>>
> >>>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&reserved=0
> >>>
> >>>
> >>> <
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&reserved=0
> >
> >>>
> >>> ____
> >>>
> >>> __ __
> >>>
> >>> Is it possible that I have hit upon a couple scenarios where
> that
> >>> has not taken place? Unfortunately, I’m not yet at a position
> to
> >>> suggest a fix, but I’m guessing there’s some missing
> >>> initialization code in one or more of the batch operators. If
> >>> this is indeed a legitimate issue, I’ll be happy to log an
> issue,
> >>> but I’ll hold off until the community gets a chance to look at
> >>> it.____
> >>>
> >>> __ __
> >>>
> >>> Thanks,____
> >>>
> >>> * Preston ____
> >>>
> >>> CONFIDENTIALITY NOTICE This message and any included
> attachments
> >>> are from Cerner Corporation and are intended only for the
> >>> addressee. The information contained in this message is
> >>> confidential and may constitute inside or non-public
> information
> >>> under international, federal, or state securities laws.
> >>> Unauthorized forwarding, printing, copying, distribution, or
> use
> >>> of such information is strictly prohibited and may be
> unlawful. If
> >>> you are not the addressee, please promptly delete this message
> and
> >>> notify the sender of the delivery error by e-mail or you may
> call
> >>> Cerner's corporate offices in Kansas City, Missouri, U.S.A at
> (+1)
> >>> (816)221-1024 <tel:(816)%20221-1024>.
> >>>
>
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>
Re: No filesystem found for scheme s3 using FileIO
Posted by Maximilian Michels <mx...@apache.org>.
Hi Maulik,
Thanks for reporting.
Yes, there is still a chance this bug will occur in batch mode. David
(CC) has filed an issue for this:
https://issues.apache.org/jira/browse/BEAM-8577
The current approach to let every operator ensure FileSystems is
initialized does not always work if we have a shuffle operation. The
reason is that the receiving layer may attempt to decode data using a
coder which relies on FileSystems _before_ the following operator has
been initialized.
I think we have to go with a fix that initializes FileSystems in the
coder wrapper (CoderTypeInformation). Perhaps we can liven up the
current PR: https://github.com/apache/beam/pull/10027
Cheers,
Max
On 11.12.19 21:59, Maulik Gandhi wrote:
> Hi Beam Team,
>
> I have a data pipeline (Beam on Flink running on YARN on AWS EMR), which
> reads some data and does a simple filtering operation and writes the
> data to data source S3.
>
> *Components and Versions:*
> - Beam: 2.16.0 (branch: release-2.16.0)
> - Flink: 1.8
> - YARN on AWS EMR: emr-5.26.0
>
> Below is a snippet of code
>
> PCollection<SomeType> someTypes = pipeline.apply(new ReadLatestSomeType());
> PCollection<SomeTypeValue> someTypesOutput =
> someTypes.apply(
> Filter.by(
> someTypeElement -> {
> if (some condition) {
> return false;
> }
> return true;
> }));
>
> someTypesOutput
> .apply(
> AvroIO.write(SomeType.class).to(options.getDestination().get()).withOutputFilenames())
> .getPerDestinationOutputFilenames();
>
>
> Below is the exception, I see on Flink job
>
> java.lang.IllegalArgumentException: No filesystem foundfor scheme s3
> at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
> at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
> at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
> at org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
> at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
> at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
> at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
> at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
> at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
> at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
> at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
> at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:106)
>
>
> My Beam pipeline is failing randomly with the exception I have listed
> above. I saw that the fix was made available on
> the branch: release-2.16.0 (comment on JIRA:
> https://issues.apache.org/jira/browse/BEAM-8303) which I have checked
> out locally. I did a custom build of the Apache/Beam
> (branch: release-2.16.0) and made sure the artifacts were published to
> the local maven repository so that I can use the artifacts on my
> project. I imported Beam artifacts (2.16.0-MG-SNAPSHOT) in my project
> and build my project. Once I had the compiled JAR, I kicked off Flink
> jobs on an AWS EMR cluster. The Flink job will give fail/success
> randomly (non-deterministic), without any change in the way I run the
> command, or re-building my artifact.
>
> I have noticed if I run the Flink Job with the parallelism of 1, it will
> not fail, but if I run the same job with the parallelism of 5 it can
> fail or succeed.
>
> If anyone can please help me out or give me directions, I could try out,
> it will be greatly appreciated.
>
> Thanks.
> - Maulik
>
> On Wed, Sep 25, 2019 at 10:53 AM Koprivica,Preston Blake
> <Preston.B.Koprivica@cerner.com <ma...@cerner.com>>
> wrote:
>
> Not a problem! Thanks for looking into this. In reading through
> the source associated with the stacktrace, I also noticed that
> there's neither user-code, nor beam-to-flink lifecycle code
> available for initialization. As far as I could tell, it was pure
> flink down to the coders. Nothing new here, but maybe it bolsters
> confidence in your diagnosis. I went ahead and logged an issue
> here: https://issues.apache.org/jira/browse/BEAM-8303.
>
> Let me know what I can do to help - I'm happy to test/verify any
> fixes you want to try and review any code (bearing in mind I'm a
> total newb in the beam space).
>
> Thanks again,
> Preston
>
> On 9/25/19, 10:34 AM, "Maximilian Michels" <mxm@apache.org
> <ma...@apache.org>> wrote:
>
> Hi Preston,
>
> Sorry about the name mixup, of course I meant to write Preston not
> Magnus :) See my reply below.
>
> cheers,
> Max
>
> On 25.09.19 08:31, Maximilian Michels wrote:
> > Hi Magnus,
> >
> > Your observation seems to be correct. There is an issue with
> the file
> > system registration.
> >
> > The two types of errors you are seeing, as well as the
> successful run,
> > are just due to the different structure of the generated
> transforms. The
> > Flink scheduler will distribute them differently, which
> results in some
> > pipelines being placed on task managers which happen to
> execute the
> > FileSystems initialization code and others not.
> >
> > There is a quick fix to at least initialize the file system
> in case it
> > has not been initialized, by adding the loading code here:
> >
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F948c6fae909685e09d36b23be643182b34c8df25%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L463&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=n9zr0jMao6oGcSK0G3QSPcHbcfdZlUAGwKhWCpdKT6Y%3D&reserved=0
> >
> >
> > However, there we do not have the pipeline options available,
> which
> > prevents any configuration. The problem is that the error
> occurs in the
> > coder used in a native Flink operation which does not even
> run user code.
> >
> > I believe the only way fix this is to ship the FileSystems
> > initialization code in CoderTypeSerializer where we are sure
> to execute
> > it in time for any coders which depend on it.
> >
> > Could you file an issue? I'd be happy to fix this then.
> >
> > Thanks,
> > Max
> >
> > On 24.09.19 09:54, Chamikara Jayalath wrote:
> >> As Magnus mentioned, FileSystems are picked up from the
> class path and
> >> registered here.
> >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileSystems.java%23L480&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=RPVRS548zo%2FWdf672K%2FKbqJykjf%2FuWyS84CirzR4b0E%3D&reserved=0
> >>
> >>
> >> Seems like Flink is invoking this method at following locations.
> >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkPipelineRunner.java%23L142&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=jiMBDlzvSX2rLY9tZOLtt6QKAmCPTiglWezLKymxJAc%3D&reserved=0
> >>
> >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fmaster%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2FFlinkJobServerDriver.java%23L63&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=7hBfpu2mrAltIFCiq98hx5kp%2BLN8XYnavbU%2FKzHudnI%3D&reserved=0
> >>
> >>
> >> I'm not too familiar about Flink sure why S3 is not properly
> being
> >> registered when running the Flink job. Ccing some folks who
> are more
> >> familiar about Flink.
> >>
> >> +Ankur Goenka <mailto:goenka@google.com
> <ma...@google.com>> +Maximilian Michels
> >> <mailto:mxm@apache.org <ma...@apache.org>>
> >>
> >> Thanks,
> >> Cham
> >>
> >>
> >> On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake
> >> <Preston.B.Koprivica@cerner.com
> <ma...@cerner.com>
> >> <mailto:Preston.B.Koprivica@cerner.com
> <ma...@cerner.com>>> wrote:
> >>
> >> Thanks for the reply Magnus.
> >>
> >> I'm sorry it wasn't more clear in the original message.
> I have
> >> added the aws dependencies and set up the pipeline
> options with the
> >> aws options. For the case where I set the write to ignore
> >> windowing, everything works. But the option is
> deprecated and the
> >> comments warn against its usage.
> >>
> >> I'm wondering if where no options are set and I see the
> error that
> >> that is a case of improperly initialized filesystems in
> the flink
> >> runner. Or maybe someone has some different ideas for
> the culprit.
> >>
> >> Get Outlook for Android
> <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Faka.ms%2Fghei36&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=AGwyaD%2Bn8MQJlHqYjsefmUDcrgMifcFUVx%2BR%2BOty5Xo%3D&reserved=0>
> >>
> >>
> >>
> ------------------------------------------------------------------------
> >> *From:* Magnus Runesson <magru@linuxalert.org
> <ma...@linuxalert.org>
> >> <mailto:magru@linuxalert.org <ma...@linuxalert.org>>>
> >> *Sent:* Saturday, September 21, 2019 9:06:03 AM
> >> *To:* user@beam.apache.org <ma...@beam.apache.org>
> <mailto:user@beam.apache.org <ma...@beam.apache.org>>
> >> <user@beam.apache.org <ma...@beam.apache.org>
> <mailto:user@beam.apache.org <ma...@beam.apache.org>>>
> >> *Subject:* Re: No filesystem found for scheme s3 using
> FileIO
> >>
> >> Hi!
> >>
> >>
> >> You probably miss the S3 filesystem in your classpath.
> >>
> >> If I remember correctly you must include this
> >>
> >>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680782231&sdata=K%2FGNviln25aCjv1biaVakcPpvGEgmAzB%2FggezNGiSOo%3D&reserved=0
> >>
> >>
> <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&sdata=ZLJ7QmRWC8XkouQ7r47Ze%2Fy2pxN%2Bcttoi70ilOsJNG0%3D&reserved=0>
> >>
> >> package in your classpath/fat-jar.
> >>
> >> /Magnus
> >>
> >> On 2019-09-19 23:13, Koprivica,Preston Blake wrote:
> >>>
> >>> Hello everyone. I’m getting the following error when
> attempting to
> >>> use the FileIO apis (beam-2.15.0) and integrating with
> a 3rd party
> >>> filesystem, in this case AWS S3:____
> >>>
> >>> __ __
> >>>
> >>> java.lang.IllegalArgumentException: No filesystem found
> for scheme
> >>> s3____
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:456)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:526)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
> >>>
> >>>
> >>> at
> org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.flink.runtime.io
> <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.flink.runtime.io
> <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.flink.runtime.io
> <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
> >>>
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
> >>>
> >>> at java.lang.Thread.run(Thread.java:748)____
> >>>
> >>> __ __
> >>>
> >>> For reference, the write code resembles this:____
> >>>
> >>> __ __
> >>>
> >>> FileIO.Write<?, GenericRecord> write =
> >>> FileIO.<GenericRecord>write()____
> >>>
> >>> .via(ParquetIO.sink(schema))____
> >>>
> >>> .to(options.getOutputDir()). // will be
> something
> >>> like: s3://<bucket>/<path>____
> >>>
> >>> .withSuffix(".parquet");____
> >>>
> >>> __ __
> >>>
> >>> records.apply(String.format("Write(%s)",
> options.getOutputDir()),
> >>> write);____
> >>>
> >>> __ __
> >>>
> >>> I have setup the PipelineOptions with all the relevant
> AWS options
> >>> and the issue does not appear to be related to
> ParquetIO.sink()
> >>> directly. I am able to reliably reproduce the issue
> using JSON
> >>> formatted records and TextIO.sink(), as well.____
> >>>
> >>> __ __
> >>>
> >>> Just trying some different knobs, I went ahead and set the
> >>> following option:____
> >>>
> >>> __ __
> >>>
> >>> write = write.withNoSpilling();____
> >>>
> >>> __ __
> >>>
> >>> This actually seemed to fix the issue, only to have it
> reemerge as
> >>> I scaled up the data set size. The stack trace, while very
> >>> similar, reads:____
> >>>
> >>> __ __
> >>>
> >>> java.lang.IllegalArgumentException: No filesystem found
> for scheme
> >>> s3____
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:456)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:526)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
> >>>
> >>>
> >>> at
> org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
> >>>
> >>> at
> >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____
> >>>
> >>> at
> >>> org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.flink.runtime.io
> <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.flink.runtime.io
> <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
> >>>
> >>>
> >>> at
> >>>
> >>> org.apache.flink.runtime.io
> <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____
> >>>
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
> >>>
> >>> at
> >>>
> >>>
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
> >>>
> >>>
> >>> at
> >>>
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
> >>>
> >>> at java.lang.Thread.run(Thread.java:748) ____
> >>>
> >>> __ __
> >>>
> >>> I’ll be interested to hear some theories on the
> >>> differences/similarities in the stacks. And lastly, I
> tried
> >>> adding the following deprecated option (with and
> without the
> >>> withNoSpilling() option):____
> >>>
> >>> __ __
> >>>
> >>> write = write.withIgnoreWindowing();____
> >>>
> >>> __ __
> >>>
> >>> This seemed to fix the issue altogether but aside from
> having to
> >>> rely on a deprecated feature, there is the bigger issue
> of why?____
> >>>
> >>> __ __
> >>>
> >>> In reading through some of the source, it seems a
> common pattern
> >>> to have to manually register the pipeline options to
> seed the
> >>> filesystem registry during the setup part of the operator
> >>> lifecycle, e.g.:
> >>>
> >>>
> https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&reserved=0
> >>>
> >>>
> >>>
> <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7Ccaa86b8ee6f740a8c3cc08d741cdd982%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637050224680792233&sdata=j1axauYJ3SxfmPrYW1rgwiGxv3mclE75Sf5PpuzpxFc%3D&reserved=0>
> >>>
> >>> ____
> >>>
> >>> __ __
> >>>
> >>> Is it possible that I have hit upon a couple scenarios
> where that
> >>> has not taken place? Unfortunately, I’m not yet at a
> position to
> >>> suggest a fix, but I’m guessing there’s some missing
> >>> initialization code in one or more of the batch
> operators. If
> >>> this is indeed a legitimate issue, I’ll be happy to log
> an issue,
> >>> but I’ll hold off until the community gets a chance to
> look at
> >>> it.____
> >>>
> >>> __ __
> >>>
> >>> Thanks,____
> >>>
> >>> * Preston ____
> >>>
> >>> CONFIDENTIALITY NOTICE This message and any included
> attachments
> >>> are from Cerner Corporation and are intended only for the
> >>> addressee. The information contained in this message is
> >>> confidential and may constitute inside or non-public
> information
> >>> under international, federal, or state securities laws.
> >>> Unauthorized forwarding, printing, copying,
> distribution, or use
> >>> of such information is strictly prohibited and may be
> unlawful. If
> >>> you are not the addressee, please promptly delete this
> message and
> >>> notify the sender of the delivery error by e-mail or
> you may call
> >>> Cerner's corporate offices in Kansas City, Missouri,
> U.S.A at (+1)
> >>> (816)221-1024 <tel:(816)%20221-1024>.
> >>>
>
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are
> from Cerner Corporation and are intended only for the addressee. The
> information contained in this message is confidential and may
> constitute inside or non-public information under international,
> federal, or state securities laws. Unauthorized forwarding,
> printing, copying, distribution, or use of such information is
> strictly prohibited and may be unlawful. If you are not the
> addressee, please promptly delete this message and notify the sender
> of the delivery error by e-mail or you may call Cerner's corporate
> offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>