You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Isuru Suriarachchi <is...@gmail.com> on 2017/10/11 22:19:47 UTC

Writing to an HDFS file from a Flink stream job

Hi all,

I'm just trying to use an HDFS file as the sink for my flink stream job. I
use the following line to do so.

stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo");


I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should
work with the full hdfs file name according to [1].

However, it doesn't work as expected. File foo is created on hdfs. But that
file is empty. But I don't see any error logs too on Flink side. When I
used a normal file sink using a "file:///.." url, it works fine and data is
there in the file.

Do I need any other configuration to get his working?

Thanks,
Isuru

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs

Re: Writing to an HDFS file from a Flink stream job

Posted by Isuru Suriarachchi <is...@gmail.com>.
Thanks for all your directions. BucketingSink worked.

Isuru

On Thu, Oct 12, 2017 at 9:05 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> I think the issue might be that writeAsText (TextOutputFormat) doesn’t
> flush the data anywhere (only on close, which in streaming doesn’t happen).
> You would need to use custom output format, but as Aljoscha pointed out
> BucketingSink makes more sense for streaming applications.
>
> Piotrek
>
> On 12 Oct 2017, at 14:58, Aljoscha Krettek <al...@apache.org> wrote:
>
> Hi Isuru,
>
> What is the source in your job and is the job terminating at some point or
> running continuously?
>
> In general, the writeAsText()/writeAsCsv() methods should not be used
> because they don't work well in an infinite streaming job that might have
> failures and recovery. I.e. what does that mean for the file, if you have
> recovery. For writing to files you would use the BucketingSink:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/
> connectors/filesystem_sink.html
>
> Best,
> Aljoscha
>
> On 12. Oct 2017, at 14:55, Piotr Nowojski <pi...@data-artisans.com> wrote:
>
> Hi,
>
> Maybe this is an access rights issue? Could you try to create and write to
> same file (same directory) in some other way (manually?), using the same
> user and the same machine as would Flink job do?
>
> Maybe there will be some hint in hdfs logs?
>
> Piotrek
>
> On 12 Oct 2017, at 00:19, Isuru Suriarachchi <is...@gmail.com> wrote:
>
> Hi all,
>
> I'm just trying to use an HDFS file as the sink for my flink stream job. I
> use the following line to do so.
>
> stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo");
>
>
> I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should
> work with the full hdfs file name according to [1].
>
> However, it doesn't work as expected. File foo is created on hdfs. But
> that file is empty. But I don't see any error logs too on Flink side. When
> I used a normal file sink using a "file:///.." url, it works fine and
> data is there in the file.
>
> Do I need any other configuration to get his working?
>
> Thanks,
> Isuru
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/config.html#hdfs
>
>
>
>
>

Re: Writing to an HDFS file from a Flink stream job

Posted by Piotr Nowojski <pi...@data-artisans.com>.
I think the issue might be that writeAsText (TextOutputFormat) doesn’t flush the data anywhere (only on close, which in streaming doesn’t happen). You would need to use custom output format, but as Aljoscha pointed out BucketingSink makes more sense for streaming applications.

Piotrek

> On 12 Oct 2017, at 14:58, Aljoscha Krettek <al...@apache.org> wrote:
> 
> Hi Isuru,
> 
> What is the source in your job and is the job terminating at some point or running continuously?
> 
> In general, the writeAsText()/writeAsCsv() methods should not be used because they don't work well in an infinite streaming job that might have failures and recovery. I.e. what does that mean for the file, if you have recovery. For writing to files you would use the BucketingSink: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html>
> 
> Best,
> Aljoscha
> 
>> On 12. Oct 2017, at 14:55, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> Maybe this is an access rights issue? Could you try to create and write to same file (same directory) in some other way (manually?), using the same user and the same machine as would Flink job do?
>> 
>> Maybe there will be some hint in hdfs logs?
>> 
>> Piotrek
>> 
>>> On 12 Oct 2017, at 00:19, Isuru Suriarachchi <isurues@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> I'm just trying to use an HDFS file as the sink for my flink stream job. I use the following line to do so.
>>> 
>>> stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo <hdfs://hadoop-master:9000/user/isuru/foo>");
>>> 
>>> I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should work with the full hdfs file name according to [1]. 
>>> 
>>> However, it doesn't work as expected. File foo is created on hdfs. But that file is empty. But I don't see any error logs too on Flink side. When I used a normal file sink using a "file:/// <file:///>.." url, it works fine and data is there in the file.
>>> 
>>> Do I need any other configuration to get his working?
>>> 
>>> Thanks,
>>> Isuru
>>> 
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs>
> 


Re: Writing to an HDFS file from a Flink stream job

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Isuru,

What is the source in your job and is the job terminating at some point or running continuously?

In general, the writeAsText()/writeAsCsv() methods should not be used because they don't work well in an infinite streaming job that might have failures and recovery. I.e. what does that mean for the file, if you have recovery. For writing to files you would use the BucketingSink: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html>

Best,
Aljoscha

> On 12. Oct 2017, at 14:55, Piotr Nowojski <pi...@data-artisans.com> wrote:
> 
> Hi,
> 
> Maybe this is an access rights issue? Could you try to create and write to same file (same directory) in some other way (manually?), using the same user and the same machine as would Flink job do?
> 
> Maybe there will be some hint in hdfs logs?
> 
> Piotrek
> 
>> On 12 Oct 2017, at 00:19, Isuru Suriarachchi <isurues@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi all,
>> 
>> I'm just trying to use an HDFS file as the sink for my flink stream job. I use the following line to do so.
>> 
>> stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo <hdfs://hadoop-master:9000/user/isuru/foo>");
>> 
>> I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should work with the full hdfs file name according to [1]. 
>> 
>> However, it doesn't work as expected. File foo is created on hdfs. But that file is empty. But I don't see any error logs too on Flink side. When I used a normal file sink using a "file:/// <file:///>.." url, it works fine and data is there in the file.
>> 
>> Do I need any other configuration to get his working?
>> 
>> Thanks,
>> Isuru
>> 
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs>


Re: Writing to an HDFS file from a Flink stream job

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Maybe this is an access rights issue? Could you try to create and write to same file (same directory) in some other way (manually?), using the same user and the same machine as would Flink job do?

Maybe there will be some hint in hdfs logs?

Piotrek

> On 12 Oct 2017, at 00:19, Isuru Suriarachchi <is...@gmail.com> wrote:
> 
> Hi all,
> 
> I'm just trying to use an HDFS file as the sink for my flink stream job. I use the following line to do so.
> 
> stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo");
> 
> I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should work with the full hdfs file name according to [1]. 
> 
> However, it doesn't work as expected. File foo is created on hdfs. But that file is empty. But I don't see any error logs too on Flink side. When I used a normal file sink using a "file:///.." url, it works fine and data is there in the file.
> 
> Do I need any other configuration to get his working?
> 
> Thanks,
> Isuru
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs>