You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nick Bendtner <bu...@gmail.com> on 2020/03/19 19:13:28 UTC
Help with flink hdfs sink
Hi guys,
I am using flink version 1.7.2.
I am trying to write to hdfs sink from my flink job. I setup HADOOP_HOME.
Here is the debug log for this :
2020-03-19 18:59:34,316 DEBUG
org.apache.flink.runtime.util.HadoopUtils - Cannot
find hdfs-default configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG
org.apache.flink.runtime.util.HadoopUtils - Cannot
find hdfs-site configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG
org.apache.flink.runtime.util.HadoopUtils - Adding
/home/was/HDFSConf/conf/core-site.xml to hadoop configuration
2020-03-19 18:59:34,317 DEBUG
org.apache.flink.runtime.util.HadoopUtils - Adding
/home/was/HDFSConf/conf/hdfs-site.xml to hadoop configuration
2020-03-19 18:59:34,344 INFO
org.apache.flink.runtime.security.modules.HadoopModule - Hadoop
user set to kafka (auth:KERBEROS)
This is what my streaming file sink code looks like.
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path("hdfs://tmp/auditlog/"), new
SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.create()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024
* 1024 * 1024)
.build())
.build()
result.addSink(sink).name("HDFSSink")
When I run the job I get this error stack trace :
INFO org.apache.flink.runtime.executiongraph.ExecutionGraph -
Sink: HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from
RUNNING to FAILED.
java.io.IOException: Cannot instantiate file system for URI: hdfs://tmp/auditlog
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException:
java.net.UnknownHostException: tmp
at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:687)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
Why is it trying to connect to /tmp ? Is it not supposed to get the
namenodes from the core-site.xml and hdfs-site.xml ?
Can you please help with the correct way to configure hdfs sink.
Best,
Nick.
Re: Help with flink hdfs sink
Posted by Nick Bendtner <bu...@gmail.com>.
Thank you so much guys, I used "hdfs://nameservice/path/of/your/file",
works fine for me now.
Best,
Nick
On Fri, Mar 20, 2020 at 3:48 AM Yang Wang <da...@gmail.com> wrote:
> I think Jingsong is right. You miss a slash in your HDFS path.
>
> Usually a HDFS path is like this "hdfs://nameservice/path/of/your/file".
> And the nameservice could be omitted if you want to use the defaultFS
> configured in the core-site.xml.
>
>
> Best,
> Yang
>
> Jingsong Li <ji...@gmail.com> 于2020年3月20日周五 上午10:09写道:
>
>> Hi Nick,
>>
>> You can try "new Path("hdfs:///tmp/auditlog/")". There is one additional
>> / after hdfs://, which is a protocol name.
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Mar 20, 2020 at 3:13 AM Nick Bendtner <bu...@gmail.com> wrote:
>>
>>> Hi guys,
>>> I am using flink version 1.7.2.
>>> I am trying to write to hdfs sink from my flink job. I
>>> setup HADOOP_HOME. Here is the debug log for this :
>>>
>>> 2020-03-19 18:59:34,316 DEBUG org.apache.flink.runtime.util.HadoopUtils - Cannot find hdfs-default configuration-file path in Flink config.
>>> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils - Cannot find hdfs-site configuration-file path in Flink config.
>>> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils - Adding /home/was/HDFSConf/conf/core-site.xml to hadoop configuration
>>> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils - Adding /home/was/HDFSConf/conf/hdfs-site.xml to hadoop configuration
>>> 2020-03-19 18:59:34,344 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to kafka (auth:KERBEROS)
>>>
>>>
>>> This is what my streaming file sink code looks like.
>>>
>>>
>>> val sink: StreamingFileSink[String] = StreamingFileSink
>>> .forRowFormat(new Path("hdfs://tmp/auditlog/"), new SimpleStringEncoder[String]("UTF-8"))
>>> .withRollingPolicy(DefaultRollingPolicy.create()
>>> .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
>>> .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
>>> .withMaxPartSize(1024
>>> * 1024 * 1024)
>>> .build())
>>> .build()
>>>
>>> result.addSink(sink).name("HDFSSink")
>>>
>>>
>>> When I run the job I get this error stack trace :
>>>
>>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from RUNNING to FAILED.
>>> java.io.IOException: Cannot instantiate file system for URI: hdfs://tmp/auditlog
>>> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>>> at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
>>> at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>>> at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>>> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: tmp
>>> at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>>> at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
>>> at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:687)
>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628)
>>> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>>>
>>>
>>> Why is it trying to connect to /tmp ? Is it not supposed to get the namenodes from the core-site.xml and hdfs-site.xml ?
>>>
>>> Can you please help with the correct way to configure hdfs sink.
>>>
>>>
>>> Best,
>>>
>>> Nick.
>>>
>>>
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
Re: Help with flink hdfs sink
Posted by Yang Wang <da...@gmail.com>.
I think Jingsong is right. You miss a slash in your HDFS path.
Usually a HDFS path is like this "hdfs://nameservice/path/of/your/file".
And the nameservice could be omitted if you want to use the defaultFS
configured in the core-site.xml.
Best,
Yang
Jingsong Li <ji...@gmail.com> 于2020年3月20日周五 上午10:09写道:
> Hi Nick,
>
> You can try "new Path("hdfs:///tmp/auditlog/")". There is one additional /
> after hdfs://, which is a protocol name.
>
> Best,
> Jingsong Lee
>
> On Fri, Mar 20, 2020 at 3:13 AM Nick Bendtner <bu...@gmail.com> wrote:
>
>> Hi guys,
>> I am using flink version 1.7.2.
>> I am trying to write to hdfs sink from my flink job. I setup HADOOP_HOME.
>> Here is the debug log for this :
>>
>> 2020-03-19 18:59:34,316 DEBUG org.apache.flink.runtime.util.HadoopUtils - Cannot find hdfs-default configuration-file path in Flink config.
>> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils - Cannot find hdfs-site configuration-file path in Flink config.
>> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils - Adding /home/was/HDFSConf/conf/core-site.xml to hadoop configuration
>> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils - Adding /home/was/HDFSConf/conf/hdfs-site.xml to hadoop configuration
>> 2020-03-19 18:59:34,344 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to kafka (auth:KERBEROS)
>>
>>
>> This is what my streaming file sink code looks like.
>>
>>
>> val sink: StreamingFileSink[String] = StreamingFileSink
>> .forRowFormat(new Path("hdfs://tmp/auditlog/"), new SimpleStringEncoder[String]("UTF-8"))
>> .withRollingPolicy(DefaultRollingPolicy.create()
>> .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
>> .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
>> .withMaxPartSize(1024
>> * 1024 * 1024)
>> .build())
>> .build()
>>
>> result.addSink(sink).name("HDFSSink")
>>
>>
>> When I run the job I get this error stack trace :
>>
>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from RUNNING to FAILED.
>> java.io.IOException: Cannot instantiate file system for URI: hdfs://tmp/auditlog
>> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>> at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
>> at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>> at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: tmp
>> at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
>> at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
>> at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:687)
>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628)
>> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>>
>>
>> Why is it trying to connect to /tmp ? Is it not supposed to get the namenodes from the core-site.xml and hdfs-site.xml ?
>>
>> Can you please help with the correct way to configure hdfs sink.
>>
>>
>> Best,
>>
>> Nick.
>>
>>
>>
>>
>
> --
> Best, Jingsong Lee
>
Re: Help with flink hdfs sink
Posted by Jingsong Li <ji...@gmail.com>.
Hi Nick,
You can try "new Path("hdfs:///tmp/auditlog/")". There is one additional /
after hdfs://, which is a protocol name.
Best,
Jingsong Lee
On Fri, Mar 20, 2020 at 3:13 AM Nick Bendtner <bu...@gmail.com> wrote:
> Hi guys,
> I am using flink version 1.7.2.
> I am trying to write to hdfs sink from my flink job. I setup HADOOP_HOME.
> Here is the debug log for this :
>
> 2020-03-19 18:59:34,316 DEBUG org.apache.flink.runtime.util.HadoopUtils - Cannot find hdfs-default configuration-file path in Flink config.
> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils - Cannot find hdfs-site configuration-file path in Flink config.
> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils - Adding /home/was/HDFSConf/conf/core-site.xml to hadoop configuration
> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils - Adding /home/was/HDFSConf/conf/hdfs-site.xml to hadoop configuration
> 2020-03-19 18:59:34,344 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to kafka (auth:KERBEROS)
>
>
> This is what my streaming file sink code looks like.
>
>
> val sink: StreamingFileSink[String] = StreamingFileSink
> .forRowFormat(new Path("hdfs://tmp/auditlog/"), new SimpleStringEncoder[String]("UTF-8"))
> .withRollingPolicy(DefaultRollingPolicy.create()
> .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
> .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
> .withMaxPartSize(1024
> * 1024 * 1024)
> .build())
> .build()
>
> result.addSink(sink).name("HDFSSink")
>
>
> When I run the job I get this error stack trace :
>
> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from RUNNING to FAILED.
> java.io.IOException: Cannot instantiate file system for URI: hdfs://tmp/auditlog
> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
> at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
> at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
> at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: tmp
> at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
> at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
> at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:687)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628)
> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>
>
> Why is it trying to connect to /tmp ? Is it not supposed to get the namenodes from the core-site.xml and hdfs-site.xml ?
>
> Can you please help with the correct way to configure hdfs sink.
>
>
> Best,
>
> Nick.
>
>
>
>
--
Best, Jingsong Lee