You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Encho Mishinev <en...@gmail.com> on 2018/08/29 12:19:56 UTC

Filescheme GS not found sometimes - inconsistent exceptions for reading from GCS

Hello,

I am using Flink 1.5.3 and executing jobs through Apache Beam 2.6.0. One of
my jobs involves reading from Google Cloud Storage which uses the file
scheme "gs://". Everything was fine but once in a while I would get an
exception that the scheme is not recognised. Now I've started seeing them
more often. It seems to be arbitrary - the exact same job with the exact
same parameters may finish successfully or throw this exception and fail
immediately. I can't figure out why it's not deterministic. Here is the
full exception logged upon the job failing:

java.lang.Exception: The data preparation for task 'GroupReduce
(GroupReduce at Match files from GCS/Via
MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an
error: Error obtaining the sorted input: Thread 'SortMerger Reading
Thread' terminated due to an exception: No filesystem found for scheme
gs
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error obtaining the sorted
input: Thread 'SortMerger Reading Thread' terminated due to an
exception: No filesystem found for scheme gs
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
	at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
	... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: No filesystem found for scheme gs
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme gs
	at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459)
	at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:529)
	at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
	at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49)
	at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30)
	at org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116)
	at org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88)
	at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124)
	at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
	at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90)
	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103)
	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
	at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066)
	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)


Any ideas why the behaviour is not deterministic regarding recognising
file system schemes?


Thanks,

Encho

Re: Filescheme GS not found sometimes - inconsistent exceptions for reading from GCS

Posted by Nico Kruber <ni...@data-artisans.com>.
Sorry, I had a second look and your stacktrace does not even point to
the spilling channel - it reads from the memory segment directly.
-> setting the temp dirs will thus not make a difference

I'm wondering why your deserializer eventually reads from a file on
gs:// directly, instead of, for example, a follow-up map operation.

Nico

On 13/09/18 14:52, Encho Mishinev wrote:
> Hi Nico,
> 
> Unfortunately I can't share any of data, but it is not even data being
> processed at the point of failure - it is still in the
> matching-files-from-GCS phase.
> 
> I am using Apache Beam's FileIO to match files and during one of those
> match-files steps I get the failure above.
> 
> Currently I run the job and when a taskmanager shows this error I reset
> it and restart the job. That works fine since the failure occurs at the
> beginning of the job only. It seems to be a problem within some
> taskmanagers, which is very odd considering that I have them all
> generated by a Kubernetes deployment, i.e. they should be completely
> identical. Sometimes I have to restart 3-4 of them until I have a
> running cluster.
> 
> I will try setting the temporary directory to something other than the
> default, can't hurt.
> 
> Thanks for the help,
> Encho
> 
> On Thu, Sep 13, 2018 at 3:41 PM Nico Kruber <nico@data-artisans.com
> <ma...@data-artisans.com>> wrote:
> 
>     Hi Encho,
>     the SpillingAdaptiveSpanningRecordDeserializer that you see in your
>     stack trace is executed while reading input records from another task.
>     If the (serialized) records are too large (> 5MiB), it will write and
>     assemble them in a spilling channel, i.e. on disk, instead of using
>     memory. This will use the temporary directories specified via
>     "io.tmp.dirs" (or "taskmanager.tmp.dirs") which defaults to
>     System.getProperty("java.io <http://java.io>.tmpdir").
>     -> These paths must actually be on an ordinary file system, not in gs://
>     or so.
> 
>     The reason you only see this sporadically may be because not all your
>     records are that big. It should, however, be deterministic in that it
>     should always occur for the same record. Maybe something is wrong here
>     and the record length is messed up, e.g. due to a bug in the
>     de/serializer or the network stack.
> 
>     Do you actually have a minimal working example that you can share
>     (either privately with me, or here) and shows this error?
> 
> 
>     Nico
> 
>     On 29/08/18 14:19, Encho Mishinev wrote:
>     > Hello,
>     >
>     > I am using Flink 1.5.3 and executing jobs through Apache Beam
>     2.6.0. One
>     > of my jobs involves reading from Google Cloud Storage which uses the
>     > file scheme "gs://". Everything was fine but once in a while I
>     would get
>     > an exception that the scheme is not recognised. Now I've started
>     seeing
>     > them more often. It seems to be arbitrary - the exact same job
>     with the
>     > exact same parameters may finish successfully or throw this exception
>     > and fail immediately. I can't figure out why it's not deterministic.
>     > Here is the full exception logged upon the job failing:
>     >
>     > java.lang.Exception: The data preparation for task 'GroupReduce
>     (GroupReduce at Match files from GCS/Via
>     MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an
>     error: Error obtaining the sorted input: Thread 'SortMerger Reading
>     Thread' terminated due to an exception: No filesystem found for
>     scheme gs
>     >       at
>     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
>     >       at
>     org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>     >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>     >       at java.lang.Thread.run(Thread.java:748)
>     > Caused by: java.lang.RuntimeException: Error obtaining the sorted
>     input: Thread 'SortMerger Reading Thread' terminated due to an
>     exception: No filesystem found for scheme gs
>     >       at
>     org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
>     >       at
>     org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
>     >       at
>     org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
>     >       at
>     org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
>     >       ... 3 more
>     > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>     terminated due to an exception: No filesystem found for scheme gs
>     >       at
>     org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
>     > Caused by: java.lang.IllegalArgumentException: No filesystem found
>     for scheme gs
>     >       at org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:459)
>     >       at org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:529)
>     >       at
>     org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
>     >       at
>     org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49)
>     >       at
>     org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30)
>     >       at
>     org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116)
>     >       at
>     org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88)
>     >       at
>     org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124)
>     >       at
>     org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
>     >       at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>     >       at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>     >       at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>     >       at
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>     >       at
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>     >       at
>     org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>     >       at
>     org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90)
>     >       at
>     org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103)
>     >       at
>     org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
>     >       at
>     org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>     >       at
>     org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>     >       at org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
>     >       at org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>     >       at org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>     >       at
>     org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>     >       at
>     org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066)
>     >       at
>     org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)
>     >
>     >
>     > Any ideas why the behaviour is not deterministic regarding
>     recognising file system schemes?
>     >
>     >
>     > Thanks,
>     >
>     > Encho
>     >
> 
>     -- 
>     Nico Kruber | Software Engineer
>     data Artisans
> 
>     Follow us @dataArtisans
>     --
>     Join Flink Forward - The Apache Flink Conference
>     Stream Processing | Event Driven | Real Time
>     --
>     Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>     data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
>     --
>     Data Artisans GmbH
>     Registered at Amtsgericht Charlottenburg: HRB 158244 B
>     Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: Filescheme GS not found sometimes - inconsistent exceptions for reading from GCS

Posted by Encho Mishinev <en...@gmail.com>.
Hi Nico,

Unfortunately I can't share any of data, but it is not even data being
processed at the point of failure - it is still in the
matching-files-from-GCS phase.

I am using Apache Beam's FileIO to match files and during one of those
match-files steps I get the failure above.

Currently I run the job and when a taskmanager shows this error I reset it
and restart the job. That works fine since the failure occurs at the
beginning of the job only. It seems to be a problem within some
taskmanagers, which is very odd considering that I have them all generated
by a Kubernetes deployment, i.e. they should be completely identical.
Sometimes I have to restart 3-4 of them until I have a running cluster.

I will try setting the temporary directory to something other than the
default, can't hurt.

Thanks for the help,
Encho

On Thu, Sep 13, 2018 at 3:41 PM Nico Kruber <ni...@data-artisans.com> wrote:

> Hi Encho,
> the SpillingAdaptiveSpanningRecordDeserializer that you see in your
> stack trace is executed while reading input records from another task.
> If the (serialized) records are too large (> 5MiB), it will write and
> assemble them in a spilling channel, i.e. on disk, instead of using
> memory. This will use the temporary directories specified via
> "io.tmp.dirs" (or "taskmanager.tmp.dirs") which defaults to
> System.getProperty("java.io.tmpdir").
> -> These paths must actually be on an ordinary file system, not in gs://
> or so.
>
> The reason you only see this sporadically may be because not all your
> records are that big. It should, however, be deterministic in that it
> should always occur for the same record. Maybe something is wrong here
> and the record length is messed up, e.g. due to a bug in the
> de/serializer or the network stack.
>
> Do you actually have a minimal working example that you can share
> (either privately with me, or here) and shows this error?
>
>
> Nico
>
> On 29/08/18 14:19, Encho Mishinev wrote:
> > Hello,
> >
> > I am using Flink 1.5.3 and executing jobs through Apache Beam 2.6.0. One
> > of my jobs involves reading from Google Cloud Storage which uses the
> > file scheme "gs://". Everything was fine but once in a while I would get
> > an exception that the scheme is not recognised. Now I've started seeing
> > them more often. It seems to be arbitrary - the exact same job with the
> > exact same parameters may finish successfully or throw this exception
> > and fail immediately. I can't figure out why it's not deterministic.
> > Here is the full exception logged upon the job failing:
> >
> > java.lang.Exception: The data preparation for task 'GroupReduce
> (GroupReduce at Match files from GCS/Via
> MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an error:
> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
> terminated due to an exception: No filesystem found for scheme gs
> >       at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
> >       at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> >       at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: No
> filesystem found for scheme gs
> >       at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
> >       at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
> >       at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
> >       at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
> >       ... 3 more
> > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: No filesystem found for scheme gs
> >       at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
> > Caused by: java.lang.IllegalArgumentException: No filesystem found for
> scheme gs
> >       at org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:459)
> >       at org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:529)
> >       at
> org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
> >       at
> org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49)
> >       at
> org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30)
> >       at
> org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116)
> >       at
> org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88)
> >       at
> org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124)
> >       at
> org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
> >       at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> >       at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
> >       at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
> >       at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
> >       at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
> >       at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
> >       at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90)
> >       at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103)
> >       at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
> >       at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> >       at
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
> >       at org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
> >       at org.apache.flink.runtime.io
> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
> >       at org.apache.flink.runtime.io
> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> >       at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> >       at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066)
> >       at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)
> >
> >
> > Any ideas why the behaviour is not deterministic regarding recognising
> file system schemes?
> >
> >
> > Thanks,
> >
> > Encho
> >
>
> --
> Nico Kruber | Software Engineer
> data Artisans
>
> Follow us @dataArtisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>

Re: Filescheme GS not found sometimes - inconsistent exceptions for reading from GCS

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Encho,
the SpillingAdaptiveSpanningRecordDeserializer that you see in your
stack trace is executed while reading input records from another task.
If the (serialized) records are too large (> 5MiB), it will write and
assemble them in a spilling channel, i.e. on disk, instead of using
memory. This will use the temporary directories specified via
"io.tmp.dirs" (or "taskmanager.tmp.dirs") which defaults to
System.getProperty("java.io.tmpdir").
-> These paths must actually be on an ordinary file system, not in gs://
or so.

The reason you only see this sporadically may be because not all your
records are that big. It should, however, be deterministic in that it
should always occur for the same record. Maybe something is wrong here
and the record length is messed up, e.g. due to a bug in the
de/serializer or the network stack.

Do you actually have a minimal working example that you can share
(either privately with me, or here) and shows this error?


Nico

On 29/08/18 14:19, Encho Mishinev wrote:
> Hello,
> 
> I am using Flink 1.5.3 and executing jobs through Apache Beam 2.6.0. One
> of my jobs involves reading from Google Cloud Storage which uses the
> file scheme "gs://". Everything was fine but once in a while I would get
> an exception that the scheme is not recognised. Now I've started seeing
> them more often. It seems to be arbitrary - the exact same job with the
> exact same parameters may finish successfully or throw this exception
> and fail immediately. I can't figure out why it's not deterministic.
> Here is the full exception logged upon the job failing:
> 
> java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at Match files from GCS/Via MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No filesystem found for scheme gs
> 	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
> 	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No filesystem found for scheme gs
> 	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
> 	at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
> 	at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
> 	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
> 	... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: No filesystem found for scheme gs
> 	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
> Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme gs
> 	at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459)
> 	at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:529)
> 	at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
> 	at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49)
> 	at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30)
> 	at org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116)
> 	at org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88)
> 	at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124)
> 	at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
> 	at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> 	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
> 	at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
> 	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
> 	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
> 	at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
> 	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90)
> 	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103)
> 	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
> 	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> 	at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
> 	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
> 	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> 	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> 	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066)
> 	at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)
> 
> 
> Any ideas why the behaviour is not deterministic regarding recognising file system schemes?
> 
> 
> Thanks,
> 
> Encho
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen