You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yan Yan <ya...@gmail.com> on 2018/10/03 23:18:02 UTC

HDFS HA issue in Flink 1.4 caused by failing to use fs.hdfs.hadoopconf in Flink conf

Hi,

We recently bumped to Flink 1.4 from 1.3.2, and found out an issue on HDFS
configuration.

We are using *FlinkYarnSessionCli* to start the cluster and submit job.

In 1.3.2, we set below Flink properties when using checkpoints:
state.backend.fs.checkpointdir = hdfs://nameservice0/.../..
state.checkpoints.dir = hdfs://nameservice0/../..

The mapping between logical nameservice (nameservice0) and actual namenodes
hostports are passed to Flink via *yarnship/core-site.xml *(by providing
the -yt option), and set fs.hdfs.hadoopconf=yarnship/

However, we encountered below error after bumping to 1.4, which caused the
checkpointing to fail.

2018-09-20 01:01:00.041 [yarn-jobmanager-io-thread-18] WARN
org.apache.flink.runtime.checkpoint.OperatorSubtaskState  - Error
while discarding operator states.
java.io.IOException: Cannot instantiate file system for URI:
hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a
    at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
    at org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:109)
    at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:81)
    at org.apache.flink.runtime.state.OperatorStateHandle.discardState(OperatorStateHandle.java:65)
    at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
    at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
    at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:207)
    at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108)
    at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
    at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
    at org.apache.flink.runtime.checkpoint.PendingCheckpoint$1.run(PendingCheckpoint.java:530)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException:
java.net.UnknownHostException: nameservice0
    at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
    at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
    at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
    at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
    ... 15 common frames omitted
Caused by: java.net.UnknownHostException: nameservice0
    ... 22 common frames omitted


It does not recognize nameservice0 because the *core-site.xml *on the
actual machine (read in by Flink via $HADOOP_CONF_DIR) does not use
nameservice0 but something else for the *fs.defaultFs*

Digging a little bit, I found that the *hadoopConfig* (code
<https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L159>)
does
not have the properties we set via *yarnship/core-site.xml*. Especially, I
suspect it is due to the cached *HadoopFsFactory* is initialized with an
dummy Configuration (code
<https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387>),
which prevents future *flinkConfig* getting passed in (code
<https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84>
).

I am not sure if this is intentional, or has been solved in later releases.
Has anyone encountered the same problem? And I would appreciate any
suggestions.

Thanks,
Yan

Re: HDFS HA issue in Flink 1.4 caused by failing to use fs.hdfs.hadoopconf in Flink conf

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks for figuring this out, Shuyi!

> On 9. Oct 2018, at 09:09, Shuyi Chen <su...@gmail.com> wrote:
> 
> I think the bug is introduced in FLINK-7643 (Rework FileSystem loading to use factories). In YarnApplicationMasterRunner, after the JIRA, FileSystem was not properly initialized with the correct flink configuration before calling runApplicationMaster(). W/o the initialization, a call of FileSystem.get("hdfs://nameservice0/") will cause the HadoopFsFactory to be initialized with an empty Flink Configuration, thus not able to recognize nameservice0.
> 
> I've composed unittests in YarnApplicationMasterRunner, YarnTaskManagerRunnerFactory and YarnTaskExecutorRunner, and verify that YarnApplicationMasterRunner fail to initialize the FileSystem properly, and  YarnTaskManagerRunnerFactory does the right thing, so does YarnTaskExecutorRunner.
> 
> I'll create a JIRA to track the fix.
> 
> 
> On Thu, Oct 4, 2018 at 11:23 AM Yan Yan <yanyan300300@gmail.com <ma...@gmail.com>> wrote:
> Hi Aljoscha,
> 
> Thanks for looking into this! Yes, we toggled Flink 1.4 back to Flink 1.3.2 and it works. So seems truly a feature disparity between 1.3.2 and 1.4.
> 
> Best,
> Yan
> 
> 
> On Thu, Oct 4, 2018 at 6:36 AM Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Another thing: when you retry this again with Flink 1.3.2 it works? I'm trying to rule out another problem in the setup.
> 
> 
>> On 4. Oct 2018, at 15:17, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>> 
>> Hi Yan,
>> 
>> This seems to be a bug in the FileSystems and how they're initialized. I'm looking into this myself but I'm also looping in Stephan and Stefan how have worked on this the most in the past. Maybe they have some valuable input.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 4. Oct 2018, at 01:18, Yan Yan <yanyan300300@g.mail.com <ma...@g.mail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> We recently bumped to Flink 1.4 from 1.3.2, and found out an issue on HDFS configuration.
>>> 
>>> We are using FlinkYarnSessionCli to start the cluster and submit job.
>>> 
>>> In 1.3.2, we set below Flink properties when using checkpoints:
>>> state.backend.fs.checkpointdir = hdfs://nameservice0/ <>.../..
>>> state.checkpoints.dir = hdfs://nameservice0/ <>../..
>>> 
>>> The mapping between logical nameservice (nameservice0) and actual namenodes hostports are passed to Flink via yarnship/core-site.xml (by providing the -yt option), and set fs.hdfs.hadoopconf=yarnship/
>>> 
>>> However, we encountered below error after bumping to 1.4, which caused the checkpointing to fail.
>>> 
>>> 2018-09-20 01:01:00.041 [yarn-jobmanager-io-thread-18] WARN  org.apache.flink.runtime.checkpoint.OperatorSubtaskState  - Error while discarding operator states.
>>> java.io.IOException: Cannot instantiate file system for URI: hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a <>
>>>     at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>>>     at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>>>     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>>>     at org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:109)
>>>     at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:81)
>>>     at org.apache.flink.runtime.state.OperatorStateHandle.discardState(OperatorStateHandle.java:65)
>>>     at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>>>     at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>>>     at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:207)
>>>     at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108)
>>>     at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>>>     at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>>>     at org.apache.flink.runtime.checkpoint.PendingCheckpoint$1.run(PendingCheckpoint.java:530)
>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice0
>>>     at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>>>     at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>>>     at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>>>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>>>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>>>     at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>>>     ... 15 common frames omitted
>>> Caused by: java.net.UnknownHostException: nameservice0
>>>     ... 22 common frames omitted
>>> 
>>> It does not recognize nameservice0 because the core-site.xml on the actual machine (read in by Flink via $HADOOP_CONF_DIR) does not use nameservice0 but something else for the fs.defaultFs
>>> 
>>> Digging a little bit, I found that the hadoopConfig (code <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L159>) does not have the properties we set via yarnship/core-site.xml. Especially, I suspect it is due to the cached HadoopFsFactory is initialized with an dummy Configuration (code <https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387>), which prevents future flinkConfig getting passed in (code <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84>).
>>> 
>>> I am not sure if this is intentional, or has been solved in later releases. Has anyone encountered the same problem? And I would appreciate any suggestions.
>>> 
>>> Thanks,
>>> Yan
>> 
> 
> -- 
> Best,
> Yan
> 
> 
> -- 
> "So you have to trust that the dots will somehow connect in your future."


Re: HDFS HA issue in Flink 1.4 caused by failing to use fs.hdfs.hadoopconf in Flink conf

Posted by Shuyi Chen <su...@gmail.com>.
I think the bug is introduced in FLINK-7643 (Rework FileSystem loading to
use factories). In YarnApplicationMasterRunner, after the JIRA, FileSystem
was not properly initialized with the correct flink configuration before
calling runApplicationMaster(). W/o the initialization, a call of
FileSystem.get("hdfs://nameservice0/") will cause the HadoopFsFactory to be
initialized with an empty Flink Configuration, thus not able to
recognize nameservice0.

I've composed unittests in YarnApplicationMasterRunner,
YarnTaskManagerRunnerFactory and YarnTaskExecutorRunner, and verify
that YarnApplicationMasterRunner fail to initialize the FileSystem
properly, and  YarnTaskManagerRunnerFactory does the right thing, so does
YarnTaskExecutorRunner.

I'll create a JIRA to track the fix.


On Thu, Oct 4, 2018 at 11:23 AM Yan Yan <ya...@gmail.com> wrote:

> Hi Aljoscha,
>
> Thanks for looking into this! Yes, we toggled Flink 1.4 back to Flink
> 1.3.2 and it works. So seems truly a feature disparity between 1.3.2 and
> 1.4.
>
> Best,
> Yan
>
>
> On Thu, Oct 4, 2018 at 6:36 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Another thing: when you retry this again with Flink 1.3.2 it works? I'm
>> trying to rule out another problem in the setup.
>>
>>
>> On 4. Oct 2018, at 15:17, Aljoscha Krettek <al...@apache.org> wrote:
>>
>> Hi Yan,
>>
>> This seems to be a bug in the FileSystems and how they're initialized.
>> I'm looking into this myself but I'm also looping in Stephan and Stefan how
>> have worked on this the most in the past. Maybe they have some valuable
>> input.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 4. Oct 2018, at 01:18, Yan Yan <ya...@g.mail.com> wrote:
>>
>> Hi,
>>
>> We recently bumped to Flink 1.4 from 1.3.2, and found out an issue on
>> HDFS configuration.
>>
>> We are using *FlinkYarnSessionCli* to start the cluster and submit job.
>>
>> In 1.3.2, we set below Flink properties when using checkpoints:
>> state.backend.fs.checkpointdir = hdfs://nameservice0/.../..
>> state.checkpoints.dir = hdfs://nameservice0/../..
>>
>> The mapping between logical nameservice (nameservice0) and actual
>> namenodes hostports are passed to Flink via *yarnship/core-site.xml *(by
>> providing the -yt option), and set fs.hdfs.hadoopconf=yarnship/
>>
>> However, we encountered below error after bumping to 1.4, which caused
>> the checkpointing to fail.
>>
>> 2018-09-20 01:01:00.041 [yarn-jobmanager-io-thread-18] WARN  org.apache.flink.runtime.checkpoint.OperatorSubtaskState  - Error while discarding operator states.
>> java.io.IOException: Cannot instantiate file system for URI: hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a
>>     at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>>     at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>>     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>>     at org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:109)
>>     at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:81)
>>     at org.apache.flink.runtime.state.OperatorStateHandle.discardState(OperatorStateHandle.java:65)
>>     at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>>     at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>>     at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:207)
>>     at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108)
>>     at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>>     at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>>     at org.apache.flink.runtime.checkpoint.PendingCheckpoint$1.run(PendingCheckpoint.java:530)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice0
>>     at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>>     at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>>     at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>>     at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>>     at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>>     ... 15 common frames omitted
>> Caused by: java.net.UnknownHostException: nameservice0
>>     ... 22 common frames omitted
>>
>>
>> It does not recognize nameservice0 because the *core-site.xml *on the
>> actual machine (read in by Flink via $HADOOP_CONF_DIR) does not use
>> nameservice0 but something else for the *fs.defaultFs*
>>
>> Digging a little bit, I found that the *hadoopConfig* (code
>> <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L159>) does
>> not have the properties we set via *yarnship/core-site.xml*. Especially,
>> I suspect it is due to the cached *HadoopFsFactory* is initialized with
>> an dummy Configuration (code
>> <https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387>),
>> which prevents future *flinkConfig* getting passed in (code
>> <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84>
>> ).
>>
>> I am not sure if this is intentional, or has been solved in later
>> releases. Has anyone encountered the same problem? And I would appreciate
>> any suggestions.
>>
>> Thanks,
>> Yan
>>
>>
>>
>> --
> Best,
> Yan
>


-- 
"So you have to trust that the dots will somehow connect in your future."

Re: HDFS HA issue in Flink 1.4 caused by failing to use fs.hdfs.hadoopconf in Flink conf

Posted by Yan Yan <ya...@gmail.com>.
Hi Aljoscha,

Thanks for looking into this! Yes, we toggled Flink 1.4 back to Flink 1.3.2
and it works. So seems truly a feature disparity between 1.3.2 and 1.4.

Best,
Yan


On Thu, Oct 4, 2018 at 6:36 AM Aljoscha Krettek <al...@apache.org> wrote:

> Another thing: when you retry this again with Flink 1.3.2 it works? I'm
> trying to rule out another problem in the setup.
>
>
> On 4. Oct 2018, at 15:17, Aljoscha Krettek <al...@apache.org> wrote:
>
> Hi Yan,
>
> This seems to be a bug in the FileSystems and how they're initialized. I'm
> looking into this myself but I'm also looping in Stephan and Stefan how
> have worked on this the most in the past. Maybe they have some valuable
> input.
>
> Best,
> Aljoscha
>
>
> On 4. Oct 2018, at 01:18, Yan Yan <ya...@g.mail.com> wrote:
>
> Hi,
>
> We recently bumped to Flink 1.4 from 1.3.2, and found out an issue on HDFS
> configuration.
>
> We are using *FlinkYarnSessionCli* to start the cluster and submit job.
>
> In 1.3.2, we set below Flink properties when using checkpoints:
> state.backend.fs.checkpointdir = hdfs://nameservice0/.../..
> state.checkpoints.dir = hdfs://nameservice0/../..
>
> The mapping between logical nameservice (nameservice0) and actual
> namenodes hostports are passed to Flink via *yarnship/core-site.xml *(by
> providing the -yt option), and set fs.hdfs.hadoopconf=yarnship/
>
> However, we encountered below error after bumping to 1.4, which caused the
> checkpointing to fail.
>
> 2018-09-20 01:01:00.041 [yarn-jobmanager-io-thread-18] WARN  org.apache.flink.runtime.checkpoint.OperatorSubtaskState  - Error while discarding operator states.
> java.io.IOException: Cannot instantiate file system for URI: hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a
>     at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>     at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>     at org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:109)
>     at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:81)
>     at org.apache.flink.runtime.state.OperatorStateHandle.discardState(OperatorStateHandle.java:65)
>     at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>     at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>     at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:207)
>     at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108)
>     at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>     at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>     at org.apache.flink.runtime.checkpoint.PendingCheckpoint$1.run(PendingCheckpoint.java:530)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice0
>     at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>     at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>     at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>     at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>     ... 15 common frames omitted
> Caused by: java.net.UnknownHostException: nameservice0
>     ... 22 common frames omitted
>
>
> It does not recognize nameservice0 because the *core-site.xml *on the
> actual machine (read in by Flink via $HADOOP_CONF_DIR) does not use
> nameservice0 but something else for the *fs.defaultFs*
>
> Digging a little bit, I found that the *hadoopConfig* (code
> <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L159>) does
> not have the properties we set via *yarnship/core-site.xml*. Especially,
> I suspect it is due to the cached *HadoopFsFactory* is initialized with
> an dummy Configuration (code
> <https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387>),
> which prevents future *flinkConfig* getting passed in (code
> <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84>
> ).
>
> I am not sure if this is intentional, or has been solved in later
> releases. Has anyone encountered the same problem? And I would appreciate
> any suggestions.
>
> Thanks,
> Yan
>
>
>
> --
Best,
Yan

Re: HDFS HA issue in Flink 1.4 caused by failing to use fs.hdfs.hadoopconf in Flink conf

Posted by Aljoscha Krettek <al...@apache.org>.
Another thing: when you retry this again with Flink 1.3.2 it works? I'm trying to rule out another problem in the setup.

> On 4. Oct 2018, at 15:17, Aljoscha Krettek <al...@apache.org> wrote:
> 
> Hi Yan,
> 
> This seems to be a bug in the FileSystems and how they're initialized. I'm looking into this myself but I'm also looping in Stephan and Stefan how have worked on this the most in the past. Maybe they have some valuable input.
> 
> Best,
> Aljoscha
> 
> 
>> On 4. Oct 2018, at 01:18, Yan Yan <yanyan300300@g.mail.com <ma...@g.mail.com>> wrote:
>> 
>> Hi,
>> 
>> We recently bumped to Flink 1.4 from 1.3.2, and found out an issue on HDFS configuration.
>> 
>> We are using FlinkYarnSessionCli to start the cluster and submit job.
>> 
>> In 1.3.2, we set below Flink properties when using checkpoints:
>> state.backend.fs.checkpointdir = hdfs://nameservice0/ <hdfs://nameservice0/>.../..
>> state.checkpoints.dir = hdfs://nameservice0/ <hdfs://nameservice0/>../..
>> 
>> The mapping between logical nameservice (nameservice0) and actual namenodes hostports are passed to Flink via yarnship/core-site.xml (by providing the -yt option), and set fs.hdfs.hadoopconf=yarnship/
>> 
>> However, we encountered below error after bumping to 1.4, which caused the checkpointing to fail.
>> 
>> 2018-09-20 01:01:00.041 [yarn-jobmanager-io-thread-18] WARN  org.apache.flink.runtime.checkpoint.OperatorSubtaskState  - Error while discarding operator states.
>> java.io.IOException: Cannot instantiate file system for URI: hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a <hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a>
>>     at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>>     at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>>     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>>     at org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:109)
>>     at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:81)
>>     at org.apache.flink.runtime.state.OperatorStateHandle.discardState(OperatorStateHandle.java:65)
>>     at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>>     at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>>     at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:207)
>>     at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108)
>>     at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>>     at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>>     at org.apache.flink.runtime.checkpoint.PendingCheckpoint$1.run(PendingCheckpoint.java:530)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice0
>>     at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>>     at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>>     at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>>     at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>>     at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>>     ... 15 common frames omitted
>> Caused by: java.net.UnknownHostException: nameservice0
>>     ... 22 common frames omitted
>> 
>> It does not recognize nameservice0 because the core-site.xml on the actual machine (read in by Flink via $HADOOP_CONF_DIR) does not use nameservice0 but something else for the fs.defaultFs
>> 
>> Digging a little bit, I found that the hadoopConfig (code <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L159>) does not have the properties we set via yarnship/core-site.xml. Especially, I suspect it is due to the cached HadoopFsFactory is initialized with an dummy Configuration (code <https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387>), which prevents future flinkConfig getting passed in (code <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84>).
>> 
>> I am not sure if this is intentional, or has been solved in later releases. Has anyone encountered the same problem? And I would appreciate any suggestions.
>> 
>> Thanks,
>> Yan
> 


Re: HDFS HA issue in Flink 1.4 caused by failing to use fs.hdfs.hadoopconf in Flink conf

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Yan,

This seems to be a bug in the FileSystems and how they're initialized. I'm looking into this myself but I'm also looping in Stephan and Stefan how have worked on this the most in the past. Maybe they have some valuable input.

Best,
Aljoscha


> On 4. Oct 2018, at 01:18, Yan Yan <ya...@g.mail.com> wrote:
> 
> Hi,
> 
> We recently bumped to Flink 1.4 from 1.3.2, and found out an issue on HDFS configuration.
> 
> We are using FlinkYarnSessionCli to start the cluster and submit job.
> 
> In 1.3.2, we set below Flink properties when using checkpoints:
> state.backend.fs.checkpointdir = hdfs://nameservice0/.../..
> state.checkpoints.dir = hdfs://nameservice0/../..
> 
> The mapping between logical nameservice (nameservice0) and actual namenodes hostports are passed to Flink via yarnship/core-site.xml (by providing the -yt option), and set fs.hdfs.hadoopconf=yarnship/
> 
> However, we encountered below error after bumping to 1.4, which caused the checkpointing to fail.
> 
> 2018-09-20 01:01:00.041 [yarn-jobmanager-io-thread-18] WARN  org.apache.flink.runtime.checkpoint.OperatorSubtaskState  - Error while discarding operator states.
> java.io.IOException: Cannot instantiate file system for URI: hdfs://nameservice0/app/athena_checkpoint/rt_data/gairos-athena-flink-demand-processor-opticclient-flink-job/flink/checkpoints/ckpt_1537402213581/data/ebb30a2d3b26e4d63682538a9bcdc752/chk-3204/da1d44d3-d3eb-4e57-9145-bdf30c96993a
>     at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>     at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>     at org.apache.flink.runtime.state.filesystem.FileStateHandle.getFileSystem(FileStateHandle.java:109)
>     at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:81)
>     at org.apache.flink.runtime.state.OperatorStateHandle.discardState(OperatorStateHandle.java:65)
>     at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>     at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>     at org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:207)
>     at org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:108)
>     at org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:51)
>     at org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:53)
>     at org.apache.flink.runtime.checkpoint.PendingCheckpoint$1.run(PendingCheckpoint.java:530)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice0
>     at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>     at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>     at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>     at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>     ... 15 common frames omitted
> Caused by: java.net.UnknownHostException: nameservice0
>     ... 22 common frames omitted
> 
> It does not recognize nameservice0 because the core-site.xml on the actual machine (read in by Flink via $HADOOP_CONF_DIR) does not use nameservice0 but something else for the fs.defaultFs
> 
> Digging a little bit, I found that the hadoopConfig (code <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L159>) does not have the properties we set via yarnship/core-site.xml. Especially, I suspect it is due to the cached HadoopFsFactory is initialized with an dummy Configuration (code <https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387>), which prevents future flinkConfig getting passed in (code <https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84>).
> 
> I am not sure if this is intentional, or has been solved in later releases. Has anyone encountered the same problem? And I would appreciate any suggestions.
> 
> Thanks,
> Yan