You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lasse Dalegaard <LD...@fullrate.dk> on 2016/03/18 12:53:50 UTC

RollingSink with APIs requring fs+path

Hello,

I'm working on a project where I stream in data from Kafka, massage it a bit,
and then wish to spit write it into HDFS using the RollingSink. This works just
fine using the provided examples, but I would like the data to be stored in ORC
on HDFS, rather than sequence files.

I am however unsure how to do this. I'm trying to create a new Writer class that
can be set on the sink using setWriter, but the open() API for the writer is:

  open(org.apache.hadoop.fs.FSDataOutputStream outStream)

This is problematic, because the ORC writer API has a constructor with
signature:

  WriterImpl(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path,
    OrcFile.WriterOptions opts)

Since its not possible to extract the fs/path from a FSDataOutputStream, or
alternatively inject the FSDataOutputStream into the ORC writer(it appears to
require the path for some memory management), I looked at Parquet, which turns
out to have the following constructor signature for its writer:

  ParquetFileWriter(Configuration configuration, MessageType schema, Path file)

Again, the file path is required.

At this point I'm a bit lost as how to proceed. Looking over the RollingSink
code, it appears that the Writer interface could be changed to accept the
filesystem and path, and then the current Writer functionality for managing the
FSDataOutputStream could be moved to a class implementing the new Writer
interface. This way the RollingSink functionality could be easily interface
Parquet and ORC from the Hadoop ecosystem. It seems like this would not affect

the failure semantics of the sink.

I might of course be missing something obvious - if so, any hints would be
greatly appreciated as this is my first venture into big-data, and especially
Flink, which I'm enjoying very much! :)


Best regards!

Lasse

Re: RollingSink with APIs requring fs+path

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks, great to hear that! 

- aljoscha
> On 18 Mar 2016, at 14:52, Lasse Dalegaard <LD...@fullrate.dk> wrote:
> 
> Hello,
> 
> Thanks for verifying my thesis. I've created FLINK-3637( https://issues.apache.org/jira/browse/FLINK-3637 ) and will start working on this :-)
> 
> Best regards,
> Lasse
> ________________________________________
> From: Aljoscha Krettek <al...@apache.org>
> Sent: Friday, March 18, 2016 1:56 PM
> To: user@flink.apache.org
> Subject: Re: RollingSink with APIs requring fs+path
> 
> Hi,
> you are right, it is currently only possible to write to a FSDataOutputStream. It could be generified as you mentioned. One thing that needs to be taken care of, however, is that the write offsets are correctly checkpointed to ensure exactly-once semantics in case of failure. Right now, we directly use the FSDataOutputStream to determine the write offset at the time of checkpointing. This could be moved to the Writer interface. I had a quick look at the ORC Writer and this method https://hive.apache.org/javadocs/r2.0.0/api/org/apache/orc/Writer.html#writeIntermediateFooter() should do the trick for checkpointing the write offset and also for making sure that a half-written file can be read.
> 
> Would you maybe be interested in contributing such a change to Flink? The first step would be to open a Jira Issue here: https://issues.apache.org/jira/browse/FLINK If you are not interested in working one this someone else will probably pick it up.
> 
> Cheers,
> Aljoscha
>> On 18 Mar 2016, at 12:53, Lasse Dalegaard <LD...@fullrate.dk> wrote:
>> 
>> Hello,
>> 
>> I'm working on a project where I stream in data from Kafka, massage it a bit,
>> and then wish to spit write it into HDFS using the RollingSink. This works just
>> fine using the provided examples, but I would like the data to be stored in ORC
>> on HDFS, rather than sequence files.
>> 
>> I am however unsure how to do this. I'm trying to create a new Writer class that
>> can be set on the sink using setWriter, but the open() API for the writer is:
>> 
>>  open(org.apache.hadoop.fs.FSDataOutputStream outStream)
>> 
>> This is problematic, because the ORC writer API has a constructor with
>> signature:
>> 
>>  WriterImpl(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path,
>>    OrcFile.WriterOptions opts)
>> 
>> Since its not possible to extract the fs/path from a FSDataOutputStream, or
>> alternatively inject the FSDataOutputStream into the ORC writer(it appears to
>> require the path for some memory management), I looked at Parquet, which turns
>> out to have the following constructor signature for its writer:
>> 
>>  ParquetFileWriter(Configuration configuration, MessageType schema, Path file)
>> 
>> Again, the file path is required.
>> 
>> At this point I'm a bit lost as how to proceed. Looking over the RollingSink
>> code, it appears that the Writer interface could be changed to accept the
>> filesystem and path, and then the current Writer functionality for managing the
>> FSDataOutputStream could be moved to a class implementing the new Writer
>> interface. This way the RollingSink functionality could be easily interface
>> Parquet and ORC from the Hadoop ecosystem. It seems like this would not affect
>> the failure semantics of the sink.
>> 
>> I might of course be missing something obvious - if so, any hints would be
>> greatly appreciated as this is my first venture into big-data, and especially
>> Flink, which I'm enjoying very much! :)
>> 
>> Best regards!
>> Lasse
> 


RE: RollingSink with APIs requring fs+path

Posted by Lasse Dalegaard <LD...@fullrate.dk>.
Hello,

Thanks for verifying my thesis. I've created FLINK-3637( https://issues.apache.org/jira/browse/FLINK-3637 ) and will start working on this :-)

Best regards,
Lasse
________________________________________
From: Aljoscha Krettek <al...@apache.org>
Sent: Friday, March 18, 2016 1:56 PM
To: user@flink.apache.org
Subject: Re: RollingSink with APIs requring fs+path

Hi,
you are right, it is currently only possible to write to a FSDataOutputStream. It could be generified as you mentioned. One thing that needs to be taken care of, however, is that the write offsets are correctly checkpointed to ensure exactly-once semantics in case of failure. Right now, we directly use the FSDataOutputStream to determine the write offset at the time of checkpointing. This could be moved to the Writer interface. I had a quick look at the ORC Writer and this method https://hive.apache.org/javadocs/r2.0.0/api/org/apache/orc/Writer.html#writeIntermediateFooter() should do the trick for checkpointing the write offset and also for making sure that a half-written file can be read.

Would you maybe be interested in contributing such a change to Flink? The first step would be to open a Jira Issue here: https://issues.apache.org/jira/browse/FLINK If you are not interested in working one this someone else will probably pick it up.

Cheers,
Aljoscha
> On 18 Mar 2016, at 12:53, Lasse Dalegaard <LD...@fullrate.dk> wrote:
>
> Hello,
>
> I'm working on a project where I stream in data from Kafka, massage it a bit,
> and then wish to spit write it into HDFS using the RollingSink. This works just
> fine using the provided examples, but I would like the data to be stored in ORC
> on HDFS, rather than sequence files.
>
> I am however unsure how to do this. I'm trying to create a new Writer class that
> can be set on the sink using setWriter, but the open() API for the writer is:
>
>   open(org.apache.hadoop.fs.FSDataOutputStream outStream)
>
> This is problematic, because the ORC writer API has a constructor with
> signature:
>
>   WriterImpl(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path,
>     OrcFile.WriterOptions opts)
>
> Since its not possible to extract the fs/path from a FSDataOutputStream, or
> alternatively inject the FSDataOutputStream into the ORC writer(it appears to
> require the path for some memory management), I looked at Parquet, which turns
> out to have the following constructor signature for its writer:
>
>   ParquetFileWriter(Configuration configuration, MessageType schema, Path file)
>
> Again, the file path is required.
>
> At this point I'm a bit lost as how to proceed. Looking over the RollingSink
> code, it appears that the Writer interface could be changed to accept the
> filesystem and path, and then the current Writer functionality for managing the
> FSDataOutputStream could be moved to a class implementing the new Writer
> interface. This way the RollingSink functionality could be easily interface
> Parquet and ORC from the Hadoop ecosystem. It seems like this would not affect
> the failure semantics of the sink.
>
> I might of course be missing something obvious - if so, any hints would be
> greatly appreciated as this is my first venture into big-data, and especially
> Flink, which I'm enjoying very much! :)
>
> Best regards!
> Lasse


Re: RollingSink with APIs requring fs+path

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
you are right, it is currently only possible to write to a FSDataOutputStream. It could be generified as you mentioned. One thing that needs to be taken care of, however, is that the write offsets are correctly checkpointed to ensure exactly-once semantics in case of failure. Right now, we directly use the FSDataOutputStream to determine the write offset at the time of checkpointing. This could be moved to the Writer interface. I had a quick look at the ORC Writer and this method https://hive.apache.org/javadocs/r2.0.0/api/org/apache/orc/Writer.html#writeIntermediateFooter() should do the trick for checkpointing the write offset and also for making sure that a half-written file can be read.

Would you maybe be interested in contributing such a change to Flink? The first step would be to open a Jira Issue here: https://issues.apache.org/jira/browse/FLINK If you are not interested in working one this someone else will probably pick it up.

Cheers,
Aljoscha
> On 18 Mar 2016, at 12:53, Lasse Dalegaard <LD...@fullrate.dk> wrote:
> 
> Hello,
> 
> I'm working on a project where I stream in data from Kafka, massage it a bit,
> and then wish to spit write it into HDFS using the RollingSink. This works just
> fine using the provided examples, but I would like the data to be stored in ORC 
> on HDFS, rather than sequence files.
> 
> I am however unsure how to do this. I'm trying to create a new Writer class that
> can be set on the sink using setWriter, but the open() API for the writer is: 
> 
>   open(org.apache.hadoop.fs.FSDataOutputStream outStream)
> 
> This is problematic, because the ORC writer API has a constructor with
> signature:
> 
>   WriterImpl(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path,
>     OrcFile.WriterOptions opts)
> 
> Since its not possible to extract the fs/path from a FSDataOutputStream, or
> alternatively inject the FSDataOutputStream into the ORC writer(it appears to
> require the path for some memory management), I looked at Parquet, which turns
> out to have the following constructor signature for its writer:
> 
>   ParquetFileWriter(Configuration configuration, MessageType schema, Path file)
> 
> Again, the file path is required.
> 
> At this point I'm a bit lost as how to proceed. Looking over the RollingSink
> code, it appears that the Writer interface could be changed to accept the 
> filesystem and path, and then the current Writer functionality for managing the 
> FSDataOutputStream could be moved to a class implementing the new Writer
> interface. This way the RollingSink functionality could be easily interface
> Parquet and ORC from the Hadoop ecosystem. It seems like this would not affect
> the failure semantics of the sink.
> 
> I might of course be missing something obvious - if so, any hints would be
> greatly appreciated as this is my first venture into big-data, and especially
> Flink, which I'm enjoying very much! :)
> 
> Best regards!
> Lasse