You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Juan Carlos Garcia <jc...@gmail.com> on 2018/10/26 09:47:15 UTC

No filesystem found for scheme hdfs - with the FlinkRunner

Hi Folks,

I have a strange situation while running beam 2.7.0 with the FlinkRunner,
my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its
checkpoint. Flink is able to correctly writes its checkpoint / savepoint to
HDFS without any problems.

However, my pipeline has to write to HDFS as well, but fails with "Caused
by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs"
(stacktrace at the bottom)

In the host where the pipeline is running:
1. The environment variable HADOOP_CONF_DIR is set.
2. During my pipeline construction i am explicitly calling
FileSystems.setDefaultPipelineOptions(_options); to trigger the
ServiceLoader to find all options registrar from the classpath
3. If i explore the property SCHEME_TO_FILESYSTEM of the class FileSystems
in my main method using reflection i am able to see that at launch time it
contains:
   {file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff,
hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}

Any idea what i am doing wrong with the HDFS integration?

{snippet}

FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
                            Field f =
FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
                            f.setAccessible(true);
                            AtomicReference<Map<String, FileSystem>> value
= (AtomicReference<Map<String, FileSystem>>) f.get(null);

System.out.println("===========================");
                            System.out.println(value);
{snippet}

{stacktrace}
Caused by: java.lang.IllegalArgumentException: No filesystem found for
scheme hdfs
        at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
        at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
        at
org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
        at
org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
        at
org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
        at
org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
        at
org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
        at
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)

{stacktrace}

-- 

JC



-- 

JC

Re: No filesystem found for scheme hdfs - with the FlinkRunner

Posted by Tim <ti...@gmail.com>.
Thanks for sharing that

Tim,
Sent from my iPhone

> On 26 Oct 2018, at 17:50, Juan Carlos Garcia <jc...@gmail.com> wrote:
> 
> Just for everyone to know we figure it out, it was an environment problem.
> 
> In our case we have our cluster in a network that is not accessible directly, so to deploy we need to uses Jenkins  with some slaves that have access to that network.
> 
> During deployment in the main method of the class we execute FileSystems.setDefaultPipelineOptions(_options); which trigger the HadoopFileSystemOptionsRegistrar via the ServiceLoader mechanism and this access the environment variable HADOOP_CONF_DIR in order to correctly register the Filesystem.
> 
> SO, its very important that the machine you are using for deployment have that Environment variable set as well (not only the worker where the pipeline will run).
> 
> In our case the variable was set on the .bashrc of the user used for deployment, but here is the catch.
> 
> We were using "sudo -u DEPLOYMENT_USER -s /var/lib/flink/bin/flink run -d .........", but the flag -s do not execute the user .bashrc (.bash_profile), hence we have failures at runtime. The fix was just replacing -s flag with -i to make sure the environment variable is present when the command to run works.
> 
> Thanks
> 
> 
>> On Fri, Oct 26, 2018 at 1:52 PM Juan Carlos Garcia <jc...@gmail.com> wrote:
>> Hi Tim,
>> 
>> I am using FileIO directly with the AvroIO.sink(...), however having experienced BEAM-2277 with the SparkRunner few months ago, i got the feeling this is something different (maybe some dependency mismatch/missing).
>> 
>> Thanks
>> 
>>> On Fri, Oct 26, 2018 at 1:33 PM Tim Robertson <ti...@gmail.com> wrote:
>>> Hi Juan
>>> 
>>> This sounds reminiscent of https://issues.apache.org/jira/browse/BEAM-2277 which we believed fixed in 2.7.0.
>>> What IO are you using to write your files and can you paste a snippet of your code please?
>>> 
>>> On BEAM-2277 I posted a workaround for AvroIO (it might help you find a workaround too):
>>> 
>>>  transform.apply("Write",
>>>         AvroIO.writeGenericRecords(schema)
>>>             .to(FileSystems.matchNewResource(options.getTarget(),true))
>>>             // BEAM-2277 workaround
>>>             .withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro", true)));
>>> 
>>> Thanks
>>> Tim
>>> 
>>> 
>>>> On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia <jc...@gmail.com> wrote:
>>>> Hi Folks,
>>>> 
>>>> I have a strange situation while running beam 2.7.0 with the FlinkRunner, my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its checkpoint. Flink is able to correctly writes its checkpoint / savepoint to HDFS without any problems.
>>>> 
>>>> However, my pipeline has to write to HDFS as well, but fails with "Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs"
>>>> (stacktrace at the bottom)
>>>> 
>>>> In the host where the pipeline is running:
>>>> 1. The environment variable HADOOP_CONF_DIR is set.
>>>> 2. During my pipeline construction i am explicitly calling FileSystems.setDefaultPipelineOptions(_options); to trigger the ServiceLoader to find all options registrar from the classpath
>>>> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class FileSystems in my main method using reflection i am able to see that at launch time it contains:
>>>>    {file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff, hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
>>>> 
>>>> Any idea what i am doing wrong with the HDFS integration?
>>>>    
>>>> {snippet}
>>>>                             FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
>>>>                             Field f = FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
>>>>                             f.setAccessible(true);
>>>>                             AtomicReference<Map<String, FileSystem>> value = (AtomicReference<Map<String, FileSystem>>) f.get(null);
>>>>                             System.out.println("===========================");
>>>>                             System.out.println(value);
>>>> {snippet}
>>>> 
>>>> {stacktrace}
>>>> Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs
>>>>         at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>>>>         at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>>>>         at org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
>>>>         at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>>>         at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>>>         at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>>>         at org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
>>>>         at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)
>>>> 
>>>> {stacktrace}
>>>> 
>>>> -- 
>>>> 
>>>> JC 
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> 
>>>> JC 
>>>> 
>> 
>> 
>> -- 
>> 
>> JC 
>> 
> 
> 
> -- 
> 
> JC 
> 

Re: No filesystem found for scheme hdfs - with the FlinkRunner

Posted by Juan Carlos Garcia <jc...@gmail.com>.
Just for everyone to know we figure it out, it was an environment problem.

In our case we have our cluster in a network that is not accessible
directly, so to deploy we need to uses Jenkins  with some slaves that have
access to that network.

During deployment in the *main* method of the class we execute
*FileSystems.**setDefaultPipelineOptions(_**options);* which trigger
the *HadoopFileSystemOptionsRegistrar
*via the ServiceLoader mechanism and this access the environment
variable *HADOOP_CONF_DIR
*in order to correctly register the Filesystem.

SO, its very important that the machine you are using for deployment have
that Environment variable set as well (not only the worker where the
pipeline will run).

In our case the variable was set on the .bashrc of the user used for
deployment, but here is the catch.

We were using "sudo -u DEPLOYMENT_USER *-s* /var/lib/flink/bin/flink run -d
.........", but the flag *-s *do not execute the user .bashrc
(.bash_profile), hence we have failures at runtime. The fix was just
replacing *-s *flag with *-i *to make sure the environment variable is
present when the command to run works.

Thanks


On Fri, Oct 26, 2018 at 1:52 PM Juan Carlos Garcia <jc...@gmail.com>
wrote:

> Hi Tim,
>
> I am using FileIO directly with the AvroIO.sink(...), however having
> experienced BEAM-2277 with the SparkRunner few months ago, i got the
> feeling this is something different (maybe some dependency
> mismatch/missing).
>
> Thanks
>
> On Fri, Oct 26, 2018 at 1:33 PM Tim Robertson <ti...@gmail.com>
> wrote:
>
>> Hi Juan
>>
>> This sounds reminiscent of
>> https://issues.apache.org/jira/browse/BEAM-2277 which we believed fixed
>> in 2.7.0.
>> What IO are you using to write your files and can you paste a snippet of
>> your code please?
>>
>> On BEAM-2277 I posted a workaround for AvroIO (it might help you find a
>> workaround too):
>>
>>  transform.apply("Write",
>>         AvroIO.writeGenericRecords(schema)
>>             .to(FileSystems.matchNewResource(options.getTarget(),true))
>>             // BEAM-2277 workaround            .withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro", true)));
>>
>>
>> Thanks
>> Tim
>>
>>
>> On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia <jc...@gmail.com>
>> wrote:
>>
>>> Hi Folks,
>>>
>>> I have a strange situation while running beam 2.7.0 with the
>>> FlinkRunner, my setup consist of a HA Flink cluster (1.5.4) writing to HDFS
>>> its checkpoint. Flink is able to correctly writes its checkpoint /
>>> savepoint to HDFS without any problems.
>>>
>>> However, my pipeline has to write to HDFS as well, but fails with
>>> "Caused by: java.lang.IllegalArgumentException: No filesystem found for
>>> scheme hdfs"
>>> (stacktrace at the bottom)
>>>
>>> In the host where the pipeline is running:
>>> 1. The environment variable HADOOP_CONF_DIR is set.
>>> 2. During my pipeline construction i am explicitly calling
>>> FileSystems.setDefaultPipelineOptions(_options); to trigger the
>>> ServiceLoader to find all options registrar from the classpath
>>> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class
>>> FileSystems in my main method using reflection i am able to see that at
>>> launch time it contains:
>>>    {file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff,
>>> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
>>>
>>> Any idea what i am doing wrong with the HDFS integration?
>>>
>>> {snippet}
>>>
>>> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
>>>                             Field f =
>>> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
>>>                             f.setAccessible(true);
>>>                             AtomicReference<Map<String, FileSystem>>
>>> value = (AtomicReference<Map<String, FileSystem>>) f.get(null);
>>>
>>> System.out.println("===========================");
>>>                             System.out.println(value);
>>> {snippet}
>>>
>>> {stacktrace}
>>> Caused by: java.lang.IllegalArgumentException: No filesystem found for
>>> scheme hdfs
>>>         at
>>> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>>>         at
>>> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>>>         at
>>> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
>>>         at
>>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>>         at
>>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>>         at
>>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>>         at
>>> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
>>>         at
>>> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)
>>>
>>> {stacktrace}
>>>
>>> --
>>>
>>> JC
>>>
>>>
>>>
>>> --
>>>
>>> JC
>>>
>>>
>
> --
>
> JC
>
>

-- 

JC

Re: No filesystem found for scheme hdfs - with the FlinkRunner

Posted by Juan Carlos Garcia <jc...@gmail.com>.
Hi Tim,

I am using FileIO directly with the AvroIO.sink(...), however having
experienced BEAM-2277 with the SparkRunner few months ago, i got the
feeling this is something different (maybe some dependency
mismatch/missing).

Thanks

On Fri, Oct 26, 2018 at 1:33 PM Tim Robertson <ti...@gmail.com>
wrote:

> Hi Juan
>
> This sounds reminiscent of https://issues.apache.org/jira/browse/BEAM-2277
> which we believed fixed in 2.7.0.
> What IO are you using to write your files and can you paste a snippet of
> your code please?
>
> On BEAM-2277 I posted a workaround for AvroIO (it might help you find a
> workaround too):
>
>  transform.apply("Write",
>         AvroIO.writeGenericRecords(schema)
>             .to(FileSystems.matchNewResource(options.getTarget(),true))
>             // BEAM-2277 workaround            .withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro", true)));
>
>
> Thanks
> Tim
>
>
> On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia <jc...@gmail.com>
> wrote:
>
>> Hi Folks,
>>
>> I have a strange situation while running beam 2.7.0 with the FlinkRunner,
>> my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its
>> checkpoint. Flink is able to correctly writes its checkpoint / savepoint to
>> HDFS without any problems.
>>
>> However, my pipeline has to write to HDFS as well, but fails with "Caused
>> by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs"
>> (stacktrace at the bottom)
>>
>> In the host where the pipeline is running:
>> 1. The environment variable HADOOP_CONF_DIR is set.
>> 2. During my pipeline construction i am explicitly calling
>> FileSystems.setDefaultPipelineOptions(_options); to trigger the
>> ServiceLoader to find all options registrar from the classpath
>> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class
>> FileSystems in my main method using reflection i am able to see that at
>> launch time it contains:
>>    {file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff,
>> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
>>
>> Any idea what i am doing wrong with the HDFS integration?
>>
>> {snippet}
>>
>> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
>>                             Field f =
>> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
>>                             f.setAccessible(true);
>>                             AtomicReference<Map<String, FileSystem>>
>> value = (AtomicReference<Map<String, FileSystem>>) f.get(null);
>>
>> System.out.println("===========================");
>>                             System.out.println(value);
>> {snippet}
>>
>> {stacktrace}
>> Caused by: java.lang.IllegalArgumentException: No filesystem found for
>> scheme hdfs
>>         at
>> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>>         at
>> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>>         at
>> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
>>         at
>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>         at
>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>         at
>> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>>         at
>> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
>>         at
>> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)
>>
>> {stacktrace}
>>
>> --
>>
>> JC
>>
>>
>>
>> --
>>
>> JC
>>
>>

-- 

JC

Re: No filesystem found for scheme hdfs - with the FlinkRunner

Posted by Tim Robertson <ti...@gmail.com>.
Hi Juan

This sounds reminiscent of https://issues.apache.org/jira/browse/BEAM-2277
which we believed fixed in 2.7.0.
What IO are you using to write your files and can you paste a snippet of
your code please?

On BEAM-2277 I posted a workaround for AvroIO (it might help you find a
workaround too):

 transform.apply("Write",
        AvroIO.writeGenericRecords(schema)
            .to(FileSystems.matchNewResource(options.getTarget(),true))
            // BEAM-2277 workaround
.withTempDirectory(FileSystems.matchNewResource("hdfs://ha-nn/tmp/beam-avro",
true)));


Thanks
Tim


On Fri, Oct 26, 2018 at 11:47 AM Juan Carlos Garcia <jc...@gmail.com>
wrote:

> Hi Folks,
>
> I have a strange situation while running beam 2.7.0 with the FlinkRunner,
> my setup consist of a HA Flink cluster (1.5.4) writing to HDFS its
> checkpoint. Flink is able to correctly writes its checkpoint / savepoint to
> HDFS without any problems.
>
> However, my pipeline has to write to HDFS as well, but fails with "Caused
> by: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs"
> (stacktrace at the bottom)
>
> In the host where the pipeline is running:
> 1. The environment variable HADOOP_CONF_DIR is set.
> 2. During my pipeline construction i am explicitly calling
> FileSystems.setDefaultPipelineOptions(_options); to trigger the
> ServiceLoader to find all options registrar from the classpath
> 3. If i explore the property SCHEME_TO_FILESYSTEM of the class FileSystems
> in my main method using reflection i am able to see that at launch time it
> contains:
>    {file=org.apache.beam.sdk.io.LocalFileSystem@1941a8ff,
> hdfs=org.apache.beam.sdk.io.hdfs.HadoopFileSystem@22d7b4f8}
>
> Any idea what i am doing wrong with the HDFS integration?
>
> {snippet}
>
> FileSystems.setDefaultPipelineOptions(_context.getPipelineOptions());
>                             Field f =
> FileSystems.class.getDeclaredField("SCHEME_TO_FILESYSTEM");
>                             f.setAccessible(true);
>                             AtomicReference<Map<String, FileSystem>> value
> = (AtomicReference<Map<String, FileSystem>>) f.get(null);
>
> System.out.println("===========================");
>                             System.out.println(value);
> {snippet}
>
> {stacktrace}
> Caused by: java.lang.IllegalArgumentException: No filesystem found for
> scheme hdfs
>         at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
>         at
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
>         at
> org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink.lambda$new$22b9c623$1(FileIO.java:1293)
>         at
> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>         at
> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>         at
> org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
>         at
> org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:920)
>         at
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:715)
>
> {stacktrace}
>
> --
>
> JC
>
>
>
> --
>
> JC
>
>