You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hans-Peter Zorn <hp...@gmail.com> on 2015/08/16 19:09:57 UTC

Flink to ingest from Kafka to HDFS?

Hi,

Did anybody think of (mis-) using Flink streaming as an alternative to
Apache Flume just for ingesting data from Kafka (or other streaming
sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs
I assume it should be possible, but Is this a good idea to do?

Flume basically is about consuming data from somewhere, peeking into each
record and then directing it to a specific directory/file in HDFS reliably.
I've seen there is a FlumeSink, but would it be possible to get the same
functionality with
Flink alone?

I've skimmed through the documentation and found the option to split the
output by key and the possibility to add multiple sinks. As I understand,
Flink programs are generally static, so it would not be possible to
add/remove sinks at runtime?
So you would need to implement a custom sink directing the records to
different files based on a key (e.g. date)? Would it be difficult to
implement things like rolling outputs etc? Or better just use Flume?

Best,
Hans-Peter

Re: Flink to ingest from Kafka to HDFS?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I have an open Pull Request for a RollingFile sink. It is integrated with
checkpointing, so it can provide exactly-once behavior. If you're
interested, please check it out: https://github.com/apache/flink/pull/1084

Cheers,
Aljoscha

On Wed, 26 Aug 2015 at 10:31 Stephan Ewen <se...@apache.org> wrote:

> BTW: We are currently working on adding a rolling-file HDFS sink to Flink
> that will initially work very similar as what flume gives you. If I
> understand it correctly, Flume may have duplicates in the output from
> incomplete flushes on failures.
>
> We have actually a design to extend this later to a proper "exactly once"
> sink, integrated into Flink's checkpointing, which discards duplicates
> properly by offset tracking and truncating/compacting.
>
>
> On Wed, Aug 26, 2015 at 10:04 AM, Hans-Peter Zorn <hp...@gmail.com>
> wrote:
>
>> Hi Stephan,
>>
>> even though I started the discussion, I was just trying to estimate the
>> effort. In that project they finally opted to use flume with a Kafka
>> channel.
>>
>> Best, Hans-Peter
>>
>> On Wed, Aug 26, 2015 at 9:52 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
>> wrote:
>>
>>> Hi Stephen,
>>>
>>>
>>>
>>> I do not have a Kafka->HDFS solution, but I do have a streaming sink
>>> that writes to HDFS (external, text hive table) with auto-partitioning and
>>> rolling files. However, it does not take care of checkpointing and may have
>>> flushing issues if some partitions are seldom seen.
>>>
>>>
>>>
>>> I’m not sure it will save you much time, especially given the fact that
>>> it has not been really used yet.
>>>
>>>
>>>
>>> Code is provided with no copyright and no warranty!
>>>
>>>
>>>
>>> *import* java.io.BufferedOutputStream;
>>>
>>> *import* java.io.IOException;
>>>
>>> *import* java.util.HashMap;
>>>
>>> *import* java.util.Map;
>>>
>>>
>>>
>>> *import* org.apache.commons.io.IOUtils;
>>>
>>> *import* org.apache.flink.api.java.tuple.Tuple2;
>>>
>>> *import* org.apache.flink.configuration.Configuration;
>>>
>>> *import* org.apache.flink.core.fs.FileSystem;
>>>
>>> *import* org.apache.flink.core.fs.Path;
>>>
>>> *import* org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
>>>
>>> *import* org.apache.hive.hcatalog.data.DefaultHCatRecord;
>>>
>>> *import* org.apache.hive.hcatalog.data.schema.HCatSchema;
>>>
>>> *import* org.joda.time.DateTime;
>>>
>>>
>>>
>>> /**
>>>
>>> * This sink streams data to a HDFS directory (hive external table) with
>>> a size limit (rolling files) and automatic
>>>
>>> * partitioning. To be able to read the file content while it’s still
>>> being written, an idea is to add a char(1) field in the last
>>>
>>> * position of the hive line and to check if it has the proper value when
>>> read (if not, the line is not complete)
>>>
>>> *
>>>
>>>  * *@author* alinz
>>>
>>> */
>>>
>>> *public* *class* HiveStreamOutput *extends*
>>> RichSinkFunction<Tuple2<String, DefaultHCatRecord>> {
>>>
>>>
>>>
>>>     /**
>>>
>>>      * The Class StreamingFile, encapsulates an open output hdfs file
>>>
>>>      */
>>>
>>>     *public* *static* *class* StreamingFile {
>>>
>>>
>>>
>>>         /** base directory*/
>>>
>>>         *private* *final* String rootPath;
>>>
>>>         /** prefix*/
>>>
>>>         *private* *final* String prefix;
>>>
>>>
>>>
>>>         /** file path*/
>>>
>>>         *private* Path path;
>>>
>>>
>>>
>>>         /** open output stream */
>>>
>>>         *private* BufferedOutputStream stream;
>>>
>>>
>>>
>>>         /** current size */
>>>
>>>         *private* *long* size = 0;
>>>
>>>
>>>
>>>         /** current file number*/
>>>
>>>         *private* *long* nbFile = 0;
>>>
>>>
>>>
>>>         /** instant of the last writing on this stream. If the interval
>>> is too long, flushes content*/
>>>
>>>         *private* *long* lastInvoke;
>>>
>>>
>>>
>>>         /**
>>>
>>>          * Instantiates a new streaming file.
>>>
>>>          * *@param* rootPath destination path
>>>
>>>          * *@param* prefix file name prefix
>>>
>>>          * *@throws* IOException cannot open file
>>>
>>>          */
>>>
>>>         *public* StreamingFile(String rootPath, String prefix) *throws*
>>> IOException {
>>>
>>>             *super*();
>>>
>>>             *this*.rootPath = rootPath;
>>>
>>>             *this*.prefix = prefix;
>>>
>>>             lastInvoke = 0; // always flushes first record
>>>
>>>             open();
>>>
>>>         }
>>>
>>>
>>>
>>>         /**
>>>
>>>          * Create destination file on FS
>>>
>>>          * *@throws* IOException issue when opening file
>>>
>>>          */
>>>
>>>         *private* *void* open() *throws* IOException {
>>>
>>>             *this*.path = *new* Path(rootPath, prefix + nbFile);
>>>
>>>             *final* FileSystem filesys = path.getFileSystem();
>>>
>>>             filesys.mkdirs(path.getParent());
>>>
>>>             stream = *new* BufferedOutputStream(filesys.create(path,
>>> *true*));
>>>
>>>         }
>>>
>>>
>>>
>>>         /**
>>>
>>>          * closes stream
>>>
>>>          */
>>>
>>>         *public* *void* closeStream() {
>>>
>>>             IOUtils.*closeQuietly*(stream);
>>>
>>>             stream = *null*; // NOPMD
>>>
>>>         }
>>>
>>>
>>>
>>>         /**
>>>
>>>          * Write data into the stream
>>>
>>>          * *@param* data data to write
>>>
>>>          * *@param* maxSize max size of data ; split the file if we
>>> reach it
>>>
>>>          * *@throws* IOException writing issue
>>>
>>>          */
>>>
>>>         *public* *void* writeStream(*byte*[] data, *long* maxSize)
>>> *throws* IOException {
>>>
>>>             stream.write(data);
>>>
>>>             // If the source is too slow, flushes the data. Using this
>>> method, We do not always have the "last flushes",
>>>
>>>             // especially concerning old partitions.
>>>
>>>             // *TODO* If it's an issue, implements a time out thread.
>>>
>>>             *final* *long* maxDelayFlush = 100;
>>>
>>>             *final* *long* invokeTime = System.*currentTimeMillis*();
>>>
>>>             *if* (invokeTime - lastInvoke > maxDelayFlush) {
>>>
>>>                 stream.flush();
>>>
>>>             }
>>>
>>>             lastInvoke = invokeTime;
>>>
>>>             *if* (incTaille(data.length) >= maxSize) {
>>>
>>>                 split();
>>>
>>>             }
>>>
>>>         }
>>>
>>>
>>>
>>>         /**
>>>
>>>          * increment file size
>>>
>>>          * *@param* amount what to add
>>>
>>>          * *@return* the new size
>>>
>>>          */
>>>
>>>         *private* *long* incTaille(*long* amount) {
>>>
>>>             size += amount;
>>>
>>>             *return* size;
>>>
>>>         }
>>>
>>>
>>>
>>>         /**
>>>
>>>          * Closes current file and open a new one
>>>
>>>          * *@throws* IOException issue when opening file
>>>
>>>          */
>>>
>>>         *private* *void* split() *throws* IOException {
>>>
>>>             closeStream();
>>>
>>>             nbFile++;
>>>
>>>             open();
>>>
>>>             size = 0;
>>>
>>>         }
>>>
>>>
>>>
>>>         /**
>>>
>>>          * flushes stream
>>>
>>>          * *@throws* IOException I/O issue
>>>
>>>          */
>>>
>>>         *public* *void* flushStream() *throws* IOException {
>>>
>>>             stream.flush();
>>>
>>>         }
>>>
>>>     }
>>>
>>>
>>>
>>>     /** SUID. */
>>>
>>>     *private* *static* *final* *long* *serialVersionUID* = 1L;
>>>
>>>
>>>
>>>     // Shared fields
>>>
>>>
>>>
>>>     /** Output hive table scheme */
>>>
>>>     *private* *final* HCatSchema outputSchema;
>>>
>>>
>>>
>>>     /** field delimiter */
>>>
>>>     *private* *final* *char* delim;
>>>
>>>
>>>
>>>     /** hdfs root path */
>>>
>>>     *private* *final* String hdfsPath;
>>>
>>>
>>>
>>>     /** Max file size */
>>>
>>>     *private* *final* *long* maxSize;
>>>
>>>
>>>
>>>     // Subtask fields
>>>
>>>
>>>
>>>     /** filename prefix for a subtask, prevents conflicts with another
>>> subtask or a previous run */
>>>
>>>     *private* *transient* String namePrefix;
>>>
>>>
>>>
>>>     /** map of streams indexed per met partition */
>>>
>>>     *private* *transient* Map<String, StreamingFile> streams;
>>>
>>>
>>>
>>>     /** instant of the last periodic flush */
>>>
>>>     *private* *transient* *long* lastFlushAll;
>>>
>>>
>>>
>>>     /**
>>>
>>>      * Builds a streamer.
>>>
>>>      * *@param* outputSchema output record schema (without partition)
>>>
>>>      * *@param* delim field delimiter
>>>
>>>      * *@param* hdfsPath HDFS destination path
>>>
>>>      * *@param* maxSize max size of a file (rolls the file if reached)
>>>
>>>      */
>>>
>>>     *public* HiveStreamOutput(HCatSchema outputSchema, *char* delim,
>>> String hdfsPath, *long* maxSize) {
>>>
>>>         *super*();
>>>
>>>         *this*.outputSchema = outputSchema;
>>>
>>>         *this*.delim = delim;
>>>
>>>         *this*.hdfsPath = hdfsPath;
>>>
>>>         *this*.maxSize = maxSize;
>>>
>>>     }
>>>
>>>
>>>
>>>     /** {@inheritDoc} */
>>>
>>>     @Override
>>>
>>>     *public* *void* open(Configuration parameters) *throws* Exception { //
>>> NOPMD
>>>
>>>         // Prefix is unique for a run and a subtask, to avoid conflicts
>>>
>>>         namePrefix = "S" + getRuntimeContext().getIndexOfThisSubtask()
>>> + "_" + (*new* DateTime().getMillis()) + "_";
>>>
>>>         streams = *new* HashMap<String, StreamingFile>();
>>>
>>>     }
>>>
>>>
>>>
>>>     /** {@inheritDoc} */
>>>
>>>     @Override
>>>
>>>     *public* *void* close() *throws* Exception { // NOPMD
>>>
>>>         *for* (*final* StreamingFile file : streams.values()) {
>>>
>>>             file.closeStream();
>>>
>>>         }
>>>
>>>     }
>>>
>>>
>>>
>>>     /** {@inheritDoc} */
>>>
>>>     @Override
>>>
>>>     *public* *void* invoke(Tuple2<String, DefaultHCatRecord> value)
>>> *throws* Exception { // NOPMD
>>>
>>>         *final* String partition = value.f0;
>>>
>>>         *final* String record = HiveFileOutputFormat.*getRecordLine*(
>>> value.f1, outputSchema, delim);
>>>
>>>         // Do we have an open data stream for this partition ?
>>>
>>>         StreamingFile file = streams.get(partition);
>>>
>>>         *if* (file == *null*) {
>>>
>>>             file = *new* StreamingFile(hdfsPath + "/" + partition,
>>> namePrefix);
>>>
>>>             streams.put(partition, file);
>>>
>>>         }
>>>
>>>         file.writeStream(record.getBytes(), maxSize);
>>>
>>>
>>>
>>>         // Periodically flush all streams
>>>
>>>         *final* *long* invoke = System.*currentTimeMillis*();
>>>
>>>         *final* *long* flushPeriod = 10000;
>>>
>>>         *if* (invoke - lastFlushAll > flushPeriod) {
>>>
>>>             lastFlushAll = invoke;
>>>
>>>             *for* (*final* StreamingFile stream : streams.values()) {
>>>
>>>                 stream.flushStream();
>>>
>>>             }
>>>
>>>         }
>>>
>>>
>>>
>>

Re: Flink to ingest from Kafka to HDFS?

Posted by Stephan Ewen <se...@apache.org>.
BTW: We are currently working on adding a rolling-file HDFS sink to Flink
that will initially work very similar as what flume gives you. If I
understand it correctly, Flume may have duplicates in the output from
incomplete flushes on failures.

We have actually a design to extend this later to a proper "exactly once"
sink, integrated into Flink's checkpointing, which discards duplicates
properly by offset tracking and truncating/compacting.


On Wed, Aug 26, 2015 at 10:04 AM, Hans-Peter Zorn <hp...@gmail.com> wrote:

> Hi Stephan,
>
> even though I started the discussion, I was just trying to estimate the
> effort. In that project they finally opted to use flume with a Kafka
> channel.
>
> Best, Hans-Peter
>
> On Wed, Aug 26, 2015 at 9:52 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
> wrote:
>
>> Hi Stephen,
>>
>>
>>
>> I do not have a Kafka->HDFS solution, but I do have a streaming sink that
>> writes to HDFS (external, text hive table) with auto-partitioning and
>> rolling files. However, it does not take care of checkpointing and may have
>> flushing issues if some partitions are seldom seen.
>>
>>
>>
>> I’m not sure it will save you much time, especially given the fact that
>> it has not been really used yet.
>>
>>
>>
>> Code is provided with no copyright and no warranty!
>>
>>
>>
>> *import* java.io.BufferedOutputStream;
>>
>> *import* java.io.IOException;
>>
>> *import* java.util.HashMap;
>>
>> *import* java.util.Map;
>>
>>
>>
>> *import* org.apache.commons.io.IOUtils;
>>
>> *import* org.apache.flink.api.java.tuple.Tuple2;
>>
>> *import* org.apache.flink.configuration.Configuration;
>>
>> *import* org.apache.flink.core.fs.FileSystem;
>>
>> *import* org.apache.flink.core.fs.Path;
>>
>> *import* org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
>>
>> *import* org.apache.hive.hcatalog.data.DefaultHCatRecord;
>>
>> *import* org.apache.hive.hcatalog.data.schema.HCatSchema;
>>
>> *import* org.joda.time.DateTime;
>>
>>
>>
>> /**
>>
>> * This sink streams data to a HDFS directory (hive external table) with a
>> size limit (rolling files) and automatic
>>
>> * partitioning. To be able to read the file content while it’s still
>> being written, an idea is to add a char(1) field in the last
>>
>> * position of the hive line and to check if it has the proper value when
>> read (if not, the line is not complete)
>>
>> *
>>
>>  * *@author* alinz
>>
>> */
>>
>> *public* *class* HiveStreamOutput *extends*
>> RichSinkFunction<Tuple2<String, DefaultHCatRecord>> {
>>
>>
>>
>>     /**
>>
>>      * The Class StreamingFile, encapsulates an open output hdfs file
>>
>>      */
>>
>>     *public* *static* *class* StreamingFile {
>>
>>
>>
>>         /** base directory*/
>>
>>         *private* *final* String rootPath;
>>
>>         /** prefix*/
>>
>>         *private* *final* String prefix;
>>
>>
>>
>>         /** file path*/
>>
>>         *private* Path path;
>>
>>
>>
>>         /** open output stream */
>>
>>         *private* BufferedOutputStream stream;
>>
>>
>>
>>         /** current size */
>>
>>         *private* *long* size = 0;
>>
>>
>>
>>         /** current file number*/
>>
>>         *private* *long* nbFile = 0;
>>
>>
>>
>>         /** instant of the last writing on this stream. If the interval
>> is too long, flushes content*/
>>
>>         *private* *long* lastInvoke;
>>
>>
>>
>>         /**
>>
>>          * Instantiates a new streaming file.
>>
>>          * *@param* rootPath destination path
>>
>>          * *@param* prefix file name prefix
>>
>>          * *@throws* IOException cannot open file
>>
>>          */
>>
>>         *public* StreamingFile(String rootPath, String prefix) *throws*
>> IOException {
>>
>>             *super*();
>>
>>             *this*.rootPath = rootPath;
>>
>>             *this*.prefix = prefix;
>>
>>             lastInvoke = 0; // always flushes first record
>>
>>             open();
>>
>>         }
>>
>>
>>
>>         /**
>>
>>          * Create destination file on FS
>>
>>          * *@throws* IOException issue when opening file
>>
>>          */
>>
>>         *private* *void* open() *throws* IOException {
>>
>>             *this*.path = *new* Path(rootPath, prefix + nbFile);
>>
>>             *final* FileSystem filesys = path.getFileSystem();
>>
>>             filesys.mkdirs(path.getParent());
>>
>>             stream = *new* BufferedOutputStream(filesys.create(path,
>> *true*));
>>
>>         }
>>
>>
>>
>>         /**
>>
>>          * closes stream
>>
>>          */
>>
>>         *public* *void* closeStream() {
>>
>>             IOUtils.*closeQuietly*(stream);
>>
>>             stream = *null*; // NOPMD
>>
>>         }
>>
>>
>>
>>         /**
>>
>>          * Write data into the stream
>>
>>          * *@param* data data to write
>>
>>          * *@param* maxSize max size of data ; split the file if we
>> reach it
>>
>>          * *@throws* IOException writing issue
>>
>>          */
>>
>>         *public* *void* writeStream(*byte*[] data, *long* maxSize)
>> *throws* IOException {
>>
>>             stream.write(data);
>>
>>             // If the source is too slow, flushes the data. Using this
>> method, We do not always have the "last flushes",
>>
>>             // especially concerning old partitions.
>>
>>             // *TODO* If it's an issue, implements a time out thread.
>>
>>             *final* *long* maxDelayFlush = 100;
>>
>>             *final* *long* invokeTime = System.*currentTimeMillis*();
>>
>>             *if* (invokeTime - lastInvoke > maxDelayFlush) {
>>
>>                 stream.flush();
>>
>>             }
>>
>>             lastInvoke = invokeTime;
>>
>>             *if* (incTaille(data.length) >= maxSize) {
>>
>>                 split();
>>
>>             }
>>
>>         }
>>
>>
>>
>>         /**
>>
>>          * increment file size
>>
>>          * *@param* amount what to add
>>
>>          * *@return* the new size
>>
>>          */
>>
>>         *private* *long* incTaille(*long* amount) {
>>
>>             size += amount;
>>
>>             *return* size;
>>
>>         }
>>
>>
>>
>>         /**
>>
>>          * Closes current file and open a new one
>>
>>          * *@throws* IOException issue when opening file
>>
>>          */
>>
>>         *private* *void* split() *throws* IOException {
>>
>>             closeStream();
>>
>>             nbFile++;
>>
>>             open();
>>
>>             size = 0;
>>
>>         }
>>
>>
>>
>>         /**
>>
>>          * flushes stream
>>
>>          * *@throws* IOException I/O issue
>>
>>          */
>>
>>         *public* *void* flushStream() *throws* IOException {
>>
>>             stream.flush();
>>
>>         }
>>
>>     }
>>
>>
>>
>>     /** SUID. */
>>
>>     *private* *static* *final* *long* *serialVersionUID* = 1L;
>>
>>
>>
>>     // Shared fields
>>
>>
>>
>>     /** Output hive table scheme */
>>
>>     *private* *final* HCatSchema outputSchema;
>>
>>
>>
>>     /** field delimiter */
>>
>>     *private* *final* *char* delim;
>>
>>
>>
>>     /** hdfs root path */
>>
>>     *private* *final* String hdfsPath;
>>
>>
>>
>>     /** Max file size */
>>
>>     *private* *final* *long* maxSize;
>>
>>
>>
>>     // Subtask fields
>>
>>
>>
>>     /** filename prefix for a subtask, prevents conflicts with another
>> subtask or a previous run */
>>
>>     *private* *transient* String namePrefix;
>>
>>
>>
>>     /** map of streams indexed per met partition */
>>
>>     *private* *transient* Map<String, StreamingFile> streams;
>>
>>
>>
>>     /** instant of the last periodic flush */
>>
>>     *private* *transient* *long* lastFlushAll;
>>
>>
>>
>>     /**
>>
>>      * Builds a streamer.
>>
>>      * *@param* outputSchema output record schema (without partition)
>>
>>      * *@param* delim field delimiter
>>
>>      * *@param* hdfsPath HDFS destination path
>>
>>      * *@param* maxSize max size of a file (rolls the file if reached)
>>
>>      */
>>
>>     *public* HiveStreamOutput(HCatSchema outputSchema, *char* delim,
>> String hdfsPath, *long* maxSize) {
>>
>>         *super*();
>>
>>         *this*.outputSchema = outputSchema;
>>
>>         *this*.delim = delim;
>>
>>         *this*.hdfsPath = hdfsPath;
>>
>>         *this*.maxSize = maxSize;
>>
>>     }
>>
>>
>>
>>     /** {@inheritDoc} */
>>
>>     @Override
>>
>>     *public* *void* open(Configuration parameters) *throws* Exception { //
>> NOPMD
>>
>>         // Prefix is unique for a run and a subtask, to avoid conflicts
>>
>>         namePrefix = "S" + getRuntimeContext().getIndexOfThisSubtask() +
>> "_" + (*new* DateTime().getMillis()) + "_";
>>
>>         streams = *new* HashMap<String, StreamingFile>();
>>
>>     }
>>
>>
>>
>>     /** {@inheritDoc} */
>>
>>     @Override
>>
>>     *public* *void* close() *throws* Exception { // NOPMD
>>
>>         *for* (*final* StreamingFile file : streams.values()) {
>>
>>             file.closeStream();
>>
>>         }
>>
>>     }
>>
>>
>>
>>     /** {@inheritDoc} */
>>
>>     @Override
>>
>>     *public* *void* invoke(Tuple2<String, DefaultHCatRecord> value)
>> *throws* Exception { // NOPMD
>>
>>         *final* String partition = value.f0;
>>
>>         *final* String record = HiveFileOutputFormat.*getRecordLine*(
>> value.f1, outputSchema, delim);
>>
>>         // Do we have an open data stream for this partition ?
>>
>>         StreamingFile file = streams.get(partition);
>>
>>         *if* (file == *null*) {
>>
>>             file = *new* StreamingFile(hdfsPath + "/" + partition,
>> namePrefix);
>>
>>             streams.put(partition, file);
>>
>>         }
>>
>>         file.writeStream(record.getBytes(), maxSize);
>>
>>
>>
>>         // Periodically flush all streams
>>
>>         *final* *long* invoke = System.*currentTimeMillis*();
>>
>>         *final* *long* flushPeriod = 10000;
>>
>>         *if* (invoke - lastFlushAll > flushPeriod) {
>>
>>             lastFlushAll = invoke;
>>
>>             *for* (*final* StreamingFile stream : streams.values()) {
>>
>>                 stream.flushStream();
>>
>>             }
>>
>>         }
>>
>>     }
>>
>> }
>>
>>
>>
>> And the missing HiveFileOutputFormat.*getRecordLine *:
>>
>>
>>
>> /***TODO*  partitions should not be part of the line. But since they are
>> in the last position, it's a minor issue
>>
>>      * Shared method to transform a hive record into a text line<br>
>>
>>      * *TODO*  use of deprecated types is more convenient, but I should
>> get rid of it.<br>
>>
>>      * *@param* record hive record
>>
>>      * *@param* schema line scheme
>>
>>      * *@param* separator field delimitor
>>
>>      * *@return* corresponding line, ended with \n
>>
>>      */
>>
>>     @SuppressWarnings("deprecation")
>>
>>     // because it's so convenient
>>
>>     *public* *static* String getRecordLine(DefaultHCatRecord record,
>> HCatSchema schema, *char* separator) {
>>
>>         *final* *int* fldNumbr = Math.*min*(schema.size(), record
>> .size());
>>
>>         *final* StringBuffer line = *new* StringBuffer();
>>
>>
>>
>>         *for* (*int* idx = 0; idx < fldNumbr; idx++) {
>>
>>             *final* Object fieldVal = record.get(idx);
>>
>>             *final* String strFieldVal;
>>
>>             *if* (fieldVal == *null*) {
>>
>>                 strFieldVal = "";
>>
>>             }
>>
>>             *else* {
>>
>>                 *switch* (schema.get(idx).getType()) {
>>
>>                     *case* *DOUBLE*:
>>
>>                     *case* *FLOAT*:
>>
>>                     *case* *DECIMAL*:
>>
>>                     *case* *BIGINT*:
>>
>>                     *case* *INT*:
>>
>>                     *case* *SMALLINT*:
>>
>>                     *case* *TINYINT*:
>>
>>                     *case* *CHAR*:
>>
>>                     *case* *STRING*:
>>
>>                     *case* *VARCHAR*:
>>
>>                     *case* *BOOLEAN*:
>>
>>                     *case* *DATE*:
>>
>>                     *case* *TIMESTAMP*:
>>
>>                         strFieldVal = fieldVal.toString();
>>
>>                         *break*;
>>
>>                     *case* *ARRAY*:
>>
>>                     *case* *MAP*:
>>
>>                     *case* *STRUCT*:
>>
>>                     *case* *BINARY*:
>>
>>                     *default*:
>>
>>                         *throw* *new* IllegalArgumentException("Complex
>> Hive types (" + schema.get(idx).getTypeString()
>>
>>                             + ") are not supported");
>>
>>                 }
>>
>>             }
>>
>>             line.append(strFieldVal);
>>
>>             *if* (idx < fldNumbr - 1) {
>>
>>                 line.append(separator);
>>
>>             }
>>
>>         }
>>
>>         line.append('\n');
>>
>>         *return* line.toString();
>>
>>     }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *De :* Rico Bergmann [mailto:info@ricobergmann.de]
>> *Envoyé :* mercredi 26 août 2015 07:49
>> *À :* user@flink.apache.org
>> *Objet :* Re: Flink to ingest from Kafka to HDFS?
>>
>>
>>
>> Hi!
>>
>>
>>
>> Sorry, I won't be able to implement this soon. I just shared my ideas on
>> this.
>>
>>
>>
>> Greets. Rico.
>>
>>
>> Am 25.08.2015 um 17:52 schrieb Stephan Ewen <se...@apache.org>:
>>
>> Hi Rico!
>>
>>
>>
>> Can you give us an update on your status here? We actually need something
>> like this as well (and pretty urgent), so we would jump in
>>
>> and implement this, unless you have something already.
>>
>>
>>
>> Stephan
>>
>>
>>
>>
>>
>> On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>> BTW: This is becoming a dev discussion, maybe should move to that list...
>>
>>
>>
>> On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>> Yes, one needs exactly a mechanism to seek the output stream back to the
>> last checkpointed position, in order to overwrite duplicates.
>>
>>
>>
>> I think HDFS is not going to make this easy, there is basically no seek
>> for write. Not sure how to solve this, other then writing to tmp files and
>> copying upon success.
>>
>>
>>
>> Apache Flume must have solved this issue in some way, it may be a worth
>> looking into how they solved it.
>>
>>
>>
>> On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <in...@ricobergmann.de>
>> wrote:
>>
>> My ideas for checkpointing:
>>
>>
>>
>> I think writing to the destination should not depend on the checkpoint
>> mechanism (otherwise the output would never be written to the destination
>> if checkpointing is disabled). Instead I would keep the offsets of written
>> and Checkpointed records. When recovering you would then somehow delete or
>> overwrite the records after that offset. (But I don't really know whether
>> this is as simple as I wrote it ;-) ).
>>
>>
>>
>> Regarding the rolling files I would suggest making the values of the
>> user-defined partitioning function part of the path or file name. Writing
>> records is then basically:
>>
>> Extract the partition to write to, then add the record to a queue for
>> this partition. Each queue has an output format assigned to it. On flushing
>> the output file is opened, the content of the queue is written to it, and
>> then closed.
>>
>>
>>
>> Does this sound reasonable?
>>
>>
>> Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <al...@apache.org>:
>>
>> Yes, this seems like a good approach. We should probably no reuse the
>> KeySelector for this but maybe a more use-case specific type of function
>> that can create a desired filename from an input object.
>>
>>
>>
>> This is only the first part, though. The hard bit would be implementing
>> rolling files and also integrating it with Flink's checkpointing mechanism.
>> For integration with checkpointing you could maybe use "staging-files": all
>> elements are put into a staging file. And then, when the notification about
>> a completed checkpoint is received the contents of this file would me moved
>> (or appended) to the actual destination.
>>
>>
>>
>> Do you have any Ideas about the rolling files/checkpointing?
>>
>>
>>
>> On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <in...@ricobergmann.de> wrote:
>>
>> I'm thinking about implementing this.
>>
>>
>>
>> After looking into the flink code I would basically subclass
>> FileOutputFormat in let's say KeyedFileOutputFormat, that gets an
>> additional KeySelector object. The path in the file system is then appended
>> by the string, the KeySelector returns.
>>
>>
>>
>> U think this is a good approach?
>>
>>
>>
>> Greets. Rico.
>>
>>
>> Am 16.08.2015 um 19:56 schrieb Stephan Ewen <se...@apache.org>:
>>
>> If you are up for it, this would be a very nice addition to Flink, a
>> great contribution :-)
>>
>>
>>
>> On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>> Hi!
>>
>>
>>
>> This should definitely be possible in Flink. Pretty much exactly like you
>> describe it.
>>
>>
>>
>> You need a custom version of the HDFS sink with some logic when to roll
>> over to a new file.
>>
>>
>>
>> You can also make the sink "exactly once" by integrating it with the
>> checkpointing. For that, you would probably need to keep the current path
>> and output stream offsets as of the last checkpoint, so you can resume from
>> that offset and overwrite records to avoid duplicates. If that is not
>> possible, you would probably buffer records between checkpoints and only
>> write on checkpoints.
>>
>>
>>
>> Greetings,
>> Stephan
>>
>>
>>
>>
>>
>>
>>
>> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hp...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>>
>>
>> Did anybody think of (mis-) using Flink streaming as an alternative to
>> Apache Flume just for ingesting data from Kafka (or other streaming
>> sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs
>> I assume it should be possible, but Is this a good idea to do?
>>
>>
>>
>> Flume basically is about consuming data from somewhere, peeking into each
>> record and then directing it to a specific directory/file in HDFS reliably.
>> I've seen there is a FlumeSink, but would it be possible to get the same
>> functionality with
>>
>> Flink alone?
>>
>>
>>
>> I've skimmed through the documentation and found the option to split the
>> output by key and the possibility to add multiple sinks. As I understand,
>> Flink programs are generally static, so it would not be possible to
>> add/remove sinks at runtime?
>>
>> So you would need to implement a custom sink directing the records to
>> different files based on a key (e.g. date)? Would it be difficult to
>> implement things like rolling outputs etc? Or better just use Flume?
>>
>>
>>
>> Best,
>>
>> Hans-Peter
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> ------------------------------
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>
>

Re: Flink to ingest from Kafka to HDFS?

Posted by Hans-Peter Zorn <hp...@gmail.com>.
Hi Stephan,

even though I started the discussion, I was just trying to estimate the
effort. In that project they finally opted to use flume with a Kafka
channel.

Best, Hans-Peter

On Wed, Aug 26, 2015 at 9:52 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
wrote:

> Hi Stephen,
>
>
>
> I do not have a Kafka->HDFS solution, but I do have a streaming sink that
> writes to HDFS (external, text hive table) with auto-partitioning and
> rolling files. However, it does not take care of checkpointing and may have
> flushing issues if some partitions are seldom seen.
>
>
>
> I’m not sure it will save you much time, especially given the fact that it
> has not been really used yet.
>
>
>
> Code is provided with no copyright and no warranty!
>
>
>
> *import* java.io.BufferedOutputStream;
>
> *import* java.io.IOException;
>
> *import* java.util.HashMap;
>
> *import* java.util.Map;
>
>
>
> *import* org.apache.commons.io.IOUtils;
>
> *import* org.apache.flink.api.java.tuple.Tuple2;
>
> *import* org.apache.flink.configuration.Configuration;
>
> *import* org.apache.flink.core.fs.FileSystem;
>
> *import* org.apache.flink.core.fs.Path;
>
> *import* org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
>
> *import* org.apache.hive.hcatalog.data.DefaultHCatRecord;
>
> *import* org.apache.hive.hcatalog.data.schema.HCatSchema;
>
> *import* org.joda.time.DateTime;
>
>
>
> /**
>
> * This sink streams data to a HDFS directory (hive external table) with a
> size limit (rolling files) and automatic
>
> * partitioning. To be able to read the file content while it’s still being
> written, an idea is to add a char(1) field in the last
>
> * position of the hive line and to check if it has the proper value when
> read (if not, the line is not complete)
>
> *
>
>  * *@author* alinz
>
> */
>
> *public* *class* HiveStreamOutput *extends*
> RichSinkFunction<Tuple2<String, DefaultHCatRecord>> {
>
>
>
>     /**
>
>      * The Class StreamingFile, encapsulates an open output hdfs file
>
>      */
>
>     *public* *static* *class* StreamingFile {
>
>
>
>         /** base directory*/
>
>         *private* *final* String rootPath;
>
>         /** prefix*/
>
>         *private* *final* String prefix;
>
>
>
>         /** file path*/
>
>         *private* Path path;
>
>
>
>         /** open output stream */
>
>         *private* BufferedOutputStream stream;
>
>
>
>         /** current size */
>
>         *private* *long* size = 0;
>
>
>
>         /** current file number*/
>
>         *private* *long* nbFile = 0;
>
>
>
>         /** instant of the last writing on this stream. If the interval
> is too long, flushes content*/
>
>         *private* *long* lastInvoke;
>
>
>
>         /**
>
>          * Instantiates a new streaming file.
>
>          * *@param* rootPath destination path
>
>          * *@param* prefix file name prefix
>
>          * *@throws* IOException cannot open file
>
>          */
>
>         *public* StreamingFile(String rootPath, String prefix) *throws*
> IOException {
>
>             *super*();
>
>             *this*.rootPath = rootPath;
>
>             *this*.prefix = prefix;
>
>             lastInvoke = 0; // always flushes first record
>
>             open();
>
>         }
>
>
>
>         /**
>
>          * Create destination file on FS
>
>          * *@throws* IOException issue when opening file
>
>          */
>
>         *private* *void* open() *throws* IOException {
>
>             *this*.path = *new* Path(rootPath, prefix + nbFile);
>
>             *final* FileSystem filesys = path.getFileSystem();
>
>             filesys.mkdirs(path.getParent());
>
>             stream = *new* BufferedOutputStream(filesys.create(path,
> *true*));
>
>         }
>
>
>
>         /**
>
>          * closes stream
>
>          */
>
>         *public* *void* closeStream() {
>
>             IOUtils.*closeQuietly*(stream);
>
>             stream = *null*; // NOPMD
>
>         }
>
>
>
>         /**
>
>          * Write data into the stream
>
>          * *@param* data data to write
>
>          * *@param* maxSize max size of data ; split the file if we reach
> it
>
>          * *@throws* IOException writing issue
>
>          */
>
>         *public* *void* writeStream(*byte*[] data, *long* maxSize)
> *throws* IOException {
>
>             stream.write(data);
>
>             // If the source is too slow, flushes the data. Using this
> method, We do not always have the "last flushes",
>
>             // especially concerning old partitions.
>
>             // *TODO* If it's an issue, implements a time out thread.
>
>             *final* *long* maxDelayFlush = 100;
>
>             *final* *long* invokeTime = System.*currentTimeMillis*();
>
>             *if* (invokeTime - lastInvoke > maxDelayFlush) {
>
>                 stream.flush();
>
>             }
>
>             lastInvoke = invokeTime;
>
>             *if* (incTaille(data.length) >= maxSize) {
>
>                 split();
>
>             }
>
>         }
>
>
>
>         /**
>
>          * increment file size
>
>          * *@param* amount what to add
>
>          * *@return* the new size
>
>          */
>
>         *private* *long* incTaille(*long* amount) {
>
>             size += amount;
>
>             *return* size;
>
>         }
>
>
>
>         /**
>
>          * Closes current file and open a new one
>
>          * *@throws* IOException issue when opening file
>
>          */
>
>         *private* *void* split() *throws* IOException {
>
>             closeStream();
>
>             nbFile++;
>
>             open();
>
>             size = 0;
>
>         }
>
>
>
>         /**
>
>          * flushes stream
>
>          * *@throws* IOException I/O issue
>
>          */
>
>         *public* *void* flushStream() *throws* IOException {
>
>             stream.flush();
>
>         }
>
>     }
>
>
>
>     /** SUID. */
>
>     *private* *static* *final* *long* *serialVersionUID* = 1L;
>
>
>
>     // Shared fields
>
>
>
>     /** Output hive table scheme */
>
>     *private* *final* HCatSchema outputSchema;
>
>
>
>     /** field delimiter */
>
>     *private* *final* *char* delim;
>
>
>
>     /** hdfs root path */
>
>     *private* *final* String hdfsPath;
>
>
>
>     /** Max file size */
>
>     *private* *final* *long* maxSize;
>
>
>
>     // Subtask fields
>
>
>
>     /** filename prefix for a subtask, prevents conflicts with another
> subtask or a previous run */
>
>     *private* *transient* String namePrefix;
>
>
>
>     /** map of streams indexed per met partition */
>
>     *private* *transient* Map<String, StreamingFile> streams;
>
>
>
>     /** instant of the last periodic flush */
>
>     *private* *transient* *long* lastFlushAll;
>
>
>
>     /**
>
>      * Builds a streamer.
>
>      * *@param* outputSchema output record schema (without partition)
>
>      * *@param* delim field delimiter
>
>      * *@param* hdfsPath HDFS destination path
>
>      * *@param* maxSize max size of a file (rolls the file if reached)
>
>      */
>
>     *public* HiveStreamOutput(HCatSchema outputSchema, *char* delim,
> String hdfsPath, *long* maxSize) {
>
>         *super*();
>
>         *this*.outputSchema = outputSchema;
>
>         *this*.delim = delim;
>
>         *this*.hdfsPath = hdfsPath;
>
>         *this*.maxSize = maxSize;
>
>     }
>
>
>
>     /** {@inheritDoc} */
>
>     @Override
>
>     *public* *void* open(Configuration parameters) *throws* Exception { //
> NOPMD
>
>         // Prefix is unique for a run and a subtask, to avoid conflicts
>
>         namePrefix = "S" + getRuntimeContext().getIndexOfThisSubtask() +
> "_" + (*new* DateTime().getMillis()) + "_";
>
>         streams = *new* HashMap<String, StreamingFile>();
>
>     }
>
>
>
>     /** {@inheritDoc} */
>
>     @Override
>
>     *public* *void* close() *throws* Exception { // NOPMD
>
>         *for* (*final* StreamingFile file : streams.values()) {
>
>             file.closeStream();
>
>         }
>
>     }
>
>
>
>     /** {@inheritDoc} */
>
>     @Override
>
>     *public* *void* invoke(Tuple2<String, DefaultHCatRecord> value)
> *throws* Exception { // NOPMD
>
>         *final* String partition = value.f0;
>
>         *final* String record = HiveFileOutputFormat.*getRecordLine*(value
> .f1, outputSchema, delim);
>
>         // Do we have an open data stream for this partition ?
>
>         StreamingFile file = streams.get(partition);
>
>         *if* (file == *null*) {
>
>             file = *new* StreamingFile(hdfsPath + "/" + partition,
> namePrefix);
>
>             streams.put(partition, file);
>
>         }
>
>         file.writeStream(record.getBytes(), maxSize);
>
>
>
>         // Periodically flush all streams
>
>         *final* *long* invoke = System.*currentTimeMillis*();
>
>         *final* *long* flushPeriod = 10000;
>
>         *if* (invoke - lastFlushAll > flushPeriod) {
>
>             lastFlushAll = invoke;
>
>             *for* (*final* StreamingFile stream : streams.values()) {
>
>                 stream.flushStream();
>
>             }
>
>         }
>
>     }
>
> }
>
>
>
> And the missing HiveFileOutputFormat.*getRecordLine *:
>
>
>
> /***TODO*  partitions should not be part of the line. But since they are
> in the last position, it's a minor issue
>
>      * Shared method to transform a hive record into a text line<br>
>
>      * *TODO*  use of deprecated types is more convenient, but I should
> get rid of it.<br>
>
>      * *@param* record hive record
>
>      * *@param* schema line scheme
>
>      * *@param* separator field delimitor
>
>      * *@return* corresponding line, ended with \n
>
>      */
>
>     @SuppressWarnings("deprecation")
>
>     // because it's so convenient
>
>     *public* *static* String getRecordLine(DefaultHCatRecord record,
> HCatSchema schema, *char* separator) {
>
>         *final* *int* fldNumbr = Math.*min*(schema.size(), record.size());
>
>         *final* StringBuffer line = *new* StringBuffer();
>
>
>
>         *for* (*int* idx = 0; idx < fldNumbr; idx++) {
>
>             *final* Object fieldVal = record.get(idx);
>
>             *final* String strFieldVal;
>
>             *if* (fieldVal == *null*) {
>
>                 strFieldVal = "";
>
>             }
>
>             *else* {
>
>                 *switch* (schema.get(idx).getType()) {
>
>                     *case* *DOUBLE*:
>
>                     *case* *FLOAT*:
>
>                     *case* *DECIMAL*:
>
>                     *case* *BIGINT*:
>
>                     *case* *INT*:
>
>                     *case* *SMALLINT*:
>
>                     *case* *TINYINT*:
>
>                     *case* *CHAR*:
>
>                     *case* *STRING*:
>
>                     *case* *VARCHAR*:
>
>                     *case* *BOOLEAN*:
>
>                     *case* *DATE*:
>
>                     *case* *TIMESTAMP*:
>
>                         strFieldVal = fieldVal.toString();
>
>                         *break*;
>
>                     *case* *ARRAY*:
>
>                     *case* *MAP*:
>
>                     *case* *STRUCT*:
>
>                     *case* *BINARY*:
>
>                     *default*:
>
>                         *throw* *new* IllegalArgumentException("Complex
> Hive types (" + schema.get(idx).getTypeString()
>
>                             + ") are not supported");
>
>                 }
>
>             }
>
>             line.append(strFieldVal);
>
>             *if* (idx < fldNumbr - 1) {
>
>                 line.append(separator);
>
>             }
>
>         }
>
>         line.append('\n');
>
>         *return* line.toString();
>
>     }
>
>
>
>
>
>
>
>
>
> *De :* Rico Bergmann [mailto:info@ricobergmann.de]
> *Envoyé :* mercredi 26 août 2015 07:49
> *À :* user@flink.apache.org
> *Objet :* Re: Flink to ingest from Kafka to HDFS?
>
>
>
> Hi!
>
>
>
> Sorry, I won't be able to implement this soon. I just shared my ideas on
> this.
>
>
>
> Greets. Rico.
>
>
> Am 25.08.2015 um 17:52 schrieb Stephan Ewen <se...@apache.org>:
>
> Hi Rico!
>
>
>
> Can you give us an update on your status here? We actually need something
> like this as well (and pretty urgent), so we would jump in
>
> and implement this, unless you have something already.
>
>
>
> Stephan
>
>
>
>
>
> On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen <se...@apache.org> wrote:
>
> BTW: This is becoming a dev discussion, maybe should move to that list...
>
>
>
> On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org> wrote:
>
> Yes, one needs exactly a mechanism to seek the output stream back to the
> last checkpointed position, in order to overwrite duplicates.
>
>
>
> I think HDFS is not going to make this easy, there is basically no seek
> for write. Not sure how to solve this, other then writing to tmp files and
> copying upon success.
>
>
>
> Apache Flume must have solved this issue in some way, it may be a worth
> looking into how they solved it.
>
>
>
> On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <in...@ricobergmann.de>
> wrote:
>
> My ideas for checkpointing:
>
>
>
> I think writing to the destination should not depend on the checkpoint
> mechanism (otherwise the output would never be written to the destination
> if checkpointing is disabled). Instead I would keep the offsets of written
> and Checkpointed records. When recovering you would then somehow delete or
> overwrite the records after that offset. (But I don't really know whether
> this is as simple as I wrote it ;-) ).
>
>
>
> Regarding the rolling files I would suggest making the values of the
> user-defined partitioning function part of the path or file name. Writing
> records is then basically:
>
> Extract the partition to write to, then add the record to a queue for this
> partition. Each queue has an output format assigned to it. On flushing the
> output file is opened, the content of the queue is written to it, and then
> closed.
>
>
>
> Does this sound reasonable?
>
>
> Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <al...@apache.org>:
>
> Yes, this seems like a good approach. We should probably no reuse the
> KeySelector for this but maybe a more use-case specific type of function
> that can create a desired filename from an input object.
>
>
>
> This is only the first part, though. The hard bit would be implementing
> rolling files and also integrating it with Flink's checkpointing mechanism.
> For integration with checkpointing you could maybe use "staging-files": all
> elements are put into a staging file. And then, when the notification about
> a completed checkpoint is received the contents of this file would me moved
> (or appended) to the actual destination.
>
>
>
> Do you have any Ideas about the rolling files/checkpointing?
>
>
>
> On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <in...@ricobergmann.de> wrote:
>
> I'm thinking about implementing this.
>
>
>
> After looking into the flink code I would basically subclass
> FileOutputFormat in let's say KeyedFileOutputFormat, that gets an
> additional KeySelector object. The path in the file system is then appended
> by the string, the KeySelector returns.
>
>
>
> U think this is a good approach?
>
>
>
> Greets. Rico.
>
>
> Am 16.08.2015 um 19:56 schrieb Stephan Ewen <se...@apache.org>:
>
> If you are up for it, this would be a very nice addition to Flink, a great
> contribution :-)
>
>
>
> On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote:
>
> Hi!
>
>
>
> This should definitely be possible in Flink. Pretty much exactly like you
> describe it.
>
>
>
> You need a custom version of the HDFS sink with some logic when to roll
> over to a new file.
>
>
>
> You can also make the sink "exactly once" by integrating it with the
> checkpointing. For that, you would probably need to keep the current path
> and output stream offsets as of the last checkpoint, so you can resume from
> that offset and overwrite records to avoid duplicates. If that is not
> possible, you would probably buffer records between checkpoints and only
> write on checkpoints.
>
>
>
> Greetings,
> Stephan
>
>
>
>
>
>
>
> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hp...@gmail.com> wrote:
>
> Hi,
>
>
>
> Did anybody think of (mis-) using Flink streaming as an alternative to
> Apache Flume just for ingesting data from Kafka (or other streaming
> sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs
> I assume it should be possible, but Is this a good idea to do?
>
>
>
> Flume basically is about consuming data from somewhere, peeking into each
> record and then directing it to a specific directory/file in HDFS reliably.
> I've seen there is a FlumeSink, but would it be possible to get the same
> functionality with
>
> Flink alone?
>
>
>
> I've skimmed through the documentation and found the option to split the
> output by key and the possibility to add multiple sinks. As I understand,
> Flink programs are generally static, so it would not be possible to
> add/remove sinks at runtime?
>
> So you would need to implement a custom sink directing the records to
> different files based on a key (e.g. date)? Would it be difficult to
> implement things like rolling outputs etc? Or better just use Flume?
>
>
>
> Best,
>
> Hans-Peter
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> ------------------------------
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>

RE: Flink to ingest from Kafka to HDFS?

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi Stephen,

I do not have a Kafka->HDFS solution, but I do have a streaming sink that writes to HDFS (external, text hive table) with auto-partitioning and rolling files. However, it does not take care of checkpointing and may have flushing issues if some partitions are seldom seen.

I’m not sure it will save you much time, especially given the fact that it has not been really used yet.

Code is provided with no copyright and no warranty!

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.io.IOUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.joda.time.DateTime;

/**
* This sink streams data to a HDFS directory (hive external table) with a size limit (rolling files) and automatic
* partitioning. To be able to read the file content while it’s still being written, an idea is to add a char(1) field in the last
* position of the hive line and to check if it has the proper value when read (if not, the line is not complete)
*
 * @author alinz
*/
public class HiveStreamOutput extends RichSinkFunction<Tuple2<String, DefaultHCatRecord>> {

    /**
     * The Class StreamingFile, encapsulates an open output hdfs file
     */
    public static class StreamingFile {

        /** base directory*/
        private final String rootPath;
        /** prefix*/
        private final String prefix;

        /** file path*/
        private Path path;

        /** open output stream */
        private BufferedOutputStream stream;

        /** current size */
        private long size = 0;

        /** current file number*/
        private long nbFile = 0;

        /** instant of the last writing on this stream. If the interval is too long, flushes content*/
        private long lastInvoke;

        /**
         * Instantiates a new streaming file.
         * @param rootPath destination path
         * @param prefix file name prefix
         * @throws IOException cannot open file
         */
        public StreamingFile(String rootPath, String prefix) throws IOException {
            super();
            this.rootPath = rootPath;
            this.prefix = prefix;
            lastInvoke = 0; // always flushes first record
            open();
        }

        /**
         * Create destination file on FS
         * @throws IOException issue when opening file
         */
        private void open() throws IOException {
            this.path = new Path(rootPath, prefix + nbFile);
            final FileSystem filesys = path.getFileSystem();
            filesys.mkdirs(path.getParent());
            stream = new BufferedOutputStream(filesys.create(path, true));
        }

        /**
         * closes stream
         */
        public void closeStream() {
            IOUtils.closeQuietly(stream);
            stream = null; // NOPMD
        }

        /**
         * Write data into the stream
         * @param data data to write
         * @param maxSize max size of data ; split the file if we reach it
         * @throws IOException writing issue
         */
        public void writeStream(byte[] data, long maxSize) throws IOException {
            stream.write(data);
            // If the source is too slow, flushes the data. Using this method, We do not always have the "last flushes",
            // especially concerning old partitions.
            // TODO If it's an issue, implements a time out thread.
            final long maxDelayFlush = 100;
            final long invokeTime = System.currentTimeMillis();
            if (invokeTime - lastInvoke > maxDelayFlush) {
                stream.flush();
            }
            lastInvoke = invokeTime;
            if (incTaille(data.length) >= maxSize) {
                split();
            }
        }

        /**
         * increment file size
         * @param amount what to add
         * @return the new size
         */
        private long incTaille(long amount) {
            size += amount;
            return size;
        }

        /**
         * Closes current file and open a new one
         * @throws IOException issue when opening file
         */
        private void split() throws IOException {
            closeStream();
            nbFile++;
            open();
            size = 0;
        }

        /**
         * flushes stream
         * @throws IOException I/O issue
         */
        public void flushStream() throws IOException {
            stream.flush();
        }
    }

    /** SUID. */
    private static final long serialVersionUID = 1L;

    // Shared fields

    /** Output hive table scheme */
    private final HCatSchema outputSchema;

    /** field delimiter */
    private final char delim;

    /** hdfs root path */
    private final String hdfsPath;

    /** Max file size */
    private final long maxSize;

    // Subtask fields

    /** filename prefix for a subtask, prevents conflicts with another subtask or a previous run */
    private transient String namePrefix;

    /** map of streams indexed per met partition */
    private transient Map<String, StreamingFile> streams;

    /** instant of the last periodic flush */
    private transient long lastFlushAll;

    /**
     * Builds a streamer.
     * @param outputSchema output record schema (without partition)
     * @param delim field delimiter
     * @param hdfsPath HDFS destination path
     * @param maxSize max size of a file (rolls the file if reached)
     */
    public HiveStreamOutput(HCatSchema outputSchema, char delim, String hdfsPath, long maxSize) {
        super();
        this.outputSchema = outputSchema;
        this.delim = delim;
        this.hdfsPath = hdfsPath;
        this.maxSize = maxSize;
    }

    /** {@inheritDoc} */
    @Override
    public void open(Configuration parameters) throws Exception { // NOPMD
        // Prefix is unique for a run and a subtask, to avoid conflicts
        namePrefix = "S" + getRuntimeContext().getIndexOfThisSubtask() + "_" + (new DateTime().getMillis()) + "_";
        streams = new HashMap<String, StreamingFile>();
    }

    /** {@inheritDoc} */
    @Override
    public void close() throws Exception { // NOPMD
        for (final StreamingFile file : streams.values()) {
            file.closeStream();
        }
    }

    /** {@inheritDoc} */
    @Override
    public void invoke(Tuple2<String, DefaultHCatRecord> value) throws Exception { // NOPMD
        final String partition = value.f0;
        final String record = HiveFileOutputFormat.getRecordLine(value.f1, outputSchema, delim);
        // Do we have an open data stream for this partition ?
        StreamingFile file = streams.get(partition);
        if (file == null) {
            file = new StreamingFile(hdfsPath + "/" + partition, namePrefix);
            streams.put(partition, file);
        }
        file.writeStream(record.getBytes(), maxSize);

        // Periodically flush all streams
        final long invoke = System.currentTimeMillis();
        final long flushPeriod = 10000;
        if (invoke - lastFlushAll > flushPeriod) {
            lastFlushAll = invoke;
            for (final StreamingFile stream : streams.values()) {
                stream.flushStream();
            }
        }
    }
}

And the missing HiveFileOutputFormat.getRecordLine :

/**TODO  partitions should not be part of the line. But since they are in the last position, it's a minor issue
     * Shared method to transform a hive record into a text line<br>
     * TODO  use of deprecated types is more convenient, but I should get rid of it.<br>
     * @param record hive record
     * @param schema line scheme
     * @param separator field delimitor
     * @return corresponding line, ended with \n
     */
    @SuppressWarnings("deprecation")
    // because it's so convenient
    public static String getRecordLine(DefaultHCatRecord record, HCatSchema schema, char separator) {
        final int fldNumbr = Math.min(schema.size(), record.size());
        final StringBuffer line = new StringBuffer();

        for (int idx = 0; idx < fldNumbr; idx++) {
            final Object fieldVal = record.get(idx);
            final String strFieldVal;
            if (fieldVal == null) {
                strFieldVal = "";
            }
            else {
                switch (schema.get(idx).getType()) {
                    case DOUBLE:
                    case FLOAT:
                    case DECIMAL:
                    case BIGINT:
                    case INT:
                    case SMALLINT:
                    case TINYINT:
                    case CHAR:
                    case STRING:
                    case VARCHAR:
                    case BOOLEAN:
                    case DATE:
                    case TIMESTAMP:
                        strFieldVal = fieldVal.toString();
                        break;
                    case ARRAY:
                    case MAP:
                    case STRUCT:
                    case BINARY:
                    default:
                        throw new IllegalArgumentException("Complex Hive types (" + schema.get(idx).getTypeString()
                            + ") are not supported");
                }
            }
            line.append(strFieldVal);
            if (idx < fldNumbr - 1) {
                line.append(separator);
            }
        }
        line.append('\n');
        return line.toString();
    }




De : Rico Bergmann [mailto:info@ricobergmann.de]
Envoyé : mercredi 26 août 2015 07:49
À : user@flink.apache.org
Objet : Re: Flink to ingest from Kafka to HDFS?

Hi!

Sorry, I won't be able to implement this soon. I just shared my ideas on this.

Greets. Rico.


Am 25.08.2015 um 17:52 schrieb Stephan Ewen <se...@apache.org>>:
Hi Rico!

Can you give us an update on your status here? We actually need something like this as well (and pretty urgent), so we would jump in
and implement this, unless you have something already.

Stephan


On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen <se...@apache.org>> wrote:
BTW: This is becoming a dev discussion, maybe should move to that list...

On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org>> wrote:
Yes, one needs exactly a mechanism to seek the output stream back to the last checkpointed position, in order to overwrite duplicates.

I think HDFS is not going to make this easy, there is basically no seek for write. Not sure how to solve this, other then writing to tmp files and copying upon success.

Apache Flume must have solved this issue in some way, it may be a worth looking into how they solved it.

On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <in...@ricobergmann.de>> wrote:
My ideas for checkpointing:

I think writing to the destination should not depend on the checkpoint mechanism (otherwise the output would never be written to the destination if checkpointing is disabled). Instead I would keep the offsets of written and Checkpointed records. When recovering you would then somehow delete or overwrite the records after that offset. (But I don't really know whether this is as simple as I wrote it ;-) ).

Regarding the rolling files I would suggest making the values of the user-defined partitioning function part of the path or file name. Writing records is then basically:
Extract the partition to write to, then add the record to a queue for this partition. Each queue has an output format assigned to it. On flushing the output file is opened, the content of the queue is written to it, and then closed.

Does this sound reasonable?


Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <al...@apache.org>>:
Yes, this seems like a good approach. We should probably no reuse the KeySelector for this but maybe a more use-case specific type of function that can create a desired filename from an input object.

This is only the first part, though. The hard bit would be implementing rolling files and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing you could maybe use "staging-files": all elements are put into a staging file. And then, when the notification about a completed checkpoint is received the contents of this file would me moved (or appended) to the actual destination.

Do you have any Ideas about the rolling files/checkpointing?

On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <in...@ricobergmann.de>> wrote:
I'm thinking about implementing this.

After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns.

U think this is a good approach?

Greets. Rico.


Am 16.08.2015 um 19:56 schrieb Stephan Ewen <se...@apache.org>>:
If you are up for it, this would be a very nice addition to Flink, a great contribution :-)

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <se...@apache.org>> wrote:
Hi!

This should definitely be possible in Flink. Pretty much exactly like you describe it.

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

Greetings,
Stephan



On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hp...@gmail.com>> wrote:
Hi,

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do?

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
Flink alone?

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

Best,
Hans-Peter








________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Re: Flink to ingest from Kafka to HDFS?

Posted by Rico Bergmann <in...@ricobergmann.de>.
Hi!

Sorry, I won't be able to implement this soon. I just shared my ideas on this. 

Greets. Rico. 



> Am 25.08.2015 um 17:52 schrieb Stephan Ewen <se...@apache.org>:
> 
> Hi Rico!
> 
> Can you give us an update on your status here? We actually need something like this as well (and pretty urgent), so we would jump in
> and implement this, unless you have something already.
> 
> Stephan
> 
> 
>> On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen <se...@apache.org> wrote:
>> BTW: This is becoming a dev discussion, maybe should move to that list...
>> 
>>> On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org> wrote:
>>> Yes, one needs exactly a mechanism to seek the output stream back to the last checkpointed position, in order to overwrite duplicates.
>>> 
>>> I think HDFS is not going to make this easy, there is basically no seek for write. Not sure how to solve this, other then writing to tmp files and copying upon success.
>>> 
>>> Apache Flume must have solved this issue in some way, it may be a worth looking into how they solved it.
>>> 
>>>> On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <in...@ricobergmann.de> wrote:
>>>> My ideas for checkpointing:
>>>> 
>>>> I think writing to the destination should not depend on the checkpoint mechanism (otherwise the output would never be written to the destination if checkpointing is disabled). Instead I would keep the offsets of written and Checkpointed records. When recovering you would then somehow delete or overwrite the records after that offset. (But I don't really know whether this is as simple as I wrote it ;-) ). 
>>>> 
>>>> Regarding the rolling files I would suggest making the values of the user-defined partitioning function part of the path or file name. Writing records is then basically:
>>>> Extract the partition to write to, then add the record to a queue for this partition. Each queue has an output format assigned to it. On flushing the output file is opened, the content of the queue is written to it, and then closed.
>>>> 
>>>> Does this sound reasonable?
>>>> 
>>>> 
>>>> 
>>>>> Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <al...@apache.org>:
>>>>> 
>>>>> Yes, this seems like a good approach. We should probably no reuse the KeySelector for this but maybe a more use-case specific type of function that can create a desired filename from an input object.
>>>>> 
>>>>> This is only the first part, though. The hard bit would be implementing rolling files and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing you could maybe use "staging-files": all elements are put into a staging file. And then, when the notification about a completed checkpoint is received the contents of this file would me moved (or appended) to the actual destination.
>>>>> 
>>>>> Do you have any Ideas about the rolling files/checkpointing?
>>>>> 
>>>>>> On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <in...@ricobergmann.de> wrote:
>>>>>> I'm thinking about implementing this. 
>>>>>> 
>>>>>> After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 
>>>>>> 
>>>>>> U think this is a good approach?
>>>>>> 
>>>>>> Greets. Rico. 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> Am 16.08.2015 um 19:56 schrieb Stephan Ewen <se...@apache.org>:
>>>>>>> 
>>>>>>> If you are up for it, this would be a very nice addition to Flink, a great contribution :-)
>>>>>>> 
>>>>>>>> On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>>>>> Hi!
>>>>>>>> 
>>>>>>>> This should definitely be possible in Flink. Pretty much exactly like you describe it.
>>>>>>>> 
>>>>>>>> You need a custom version of the HDFS sink with some logic when to roll over to a new file.
>>>>>>>> 
>>>>>>>> You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.
>>>>>>>> 
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hp...@gmail.com> wrote:
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 
>>>>>>>>> 
>>>>>>>>> Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
>>>>>>>>> Flink alone?
>>>>>>>>> 
>>>>>>>>> I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
>>>>>>>>> So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?
>>>>>>>>> 
>>>>>>>>> Best, 
>>>>>>>>> Hans-Peter
> 

Re: Flink to ingest from Kafka to HDFS?

Posted by Stephan Ewen <se...@apache.org>.
Hi Rico!

Can you give us an update on your status here? We actually need something
like this as well (and pretty urgent), so we would jump in
and implement this, unless you have something already.

Stephan


On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen <se...@apache.org> wrote:

> BTW: This is becoming a dev discussion, maybe should move to that list...
>
> On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Yes, one needs exactly a mechanism to seek the output stream back to the
>> last checkpointed position, in order to overwrite duplicates.
>>
>> I think HDFS is not going to make this easy, there is basically no seek
>> for write. Not sure how to solve this, other then writing to tmp files and
>> copying upon success.
>>
>> Apache Flume must have solved this issue in some way, it may be a worth
>> looking into how they solved it.
>>
>> On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <in...@ricobergmann.de>
>> wrote:
>>
>>> My ideas for checkpointing:
>>>
>>> I think writing to the destination should not depend on the checkpoint
>>> mechanism (otherwise the output would never be written to the destination
>>> if checkpointing is disabled). Instead I would keep the offsets of written
>>> and Checkpointed records. When recovering you would then somehow delete or
>>> overwrite the records after that offset. (But I don't really know whether
>>> this is as simple as I wrote it ;-) ).
>>>
>>> Regarding the rolling files I would suggest making the values of the
>>> user-defined partitioning function part of the path or file name. Writing
>>> records is then basically:
>>> Extract the partition to write to, then add the record to a queue for
>>> this partition. Each queue has an output format assigned to it. On flushing
>>> the output file is opened, the content of the queue is written to it, and
>>> then closed.
>>>
>>> Does this sound reasonable?
>>>
>>>
>>>
>>> Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <al...@apache.org>:
>>>
>>> Yes, this seems like a good approach. We should probably no reuse the
>>> KeySelector for this but maybe a more use-case specific type of function
>>> that can create a desired filename from an input object.
>>>
>>> This is only the first part, though. The hard bit would be implementing
>>> rolling files and also integrating it with Flink's checkpointing mechanism.
>>> For integration with checkpointing you could maybe use "staging-files": all
>>> elements are put into a staging file. And then, when the notification about
>>> a completed checkpoint is received the contents of this file would me moved
>>> (or appended) to the actual destination.
>>>
>>> Do you have any Ideas about the rolling files/checkpointing?
>>>
>>> On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <in...@ricobergmann.de> wrote:
>>>
>>>> I'm thinking about implementing this.
>>>>
>>>> After looking into the flink code I would basically subclass
>>>> FileOutputFormat in let's say KeyedFileOutputFormat, that gets an
>>>> additional KeySelector object. The path in the file system is then appended
>>>> by the string, the KeySelector returns.
>>>>
>>>> U think this is a good approach?
>>>>
>>>> Greets. Rico.
>>>>
>>>>
>>>>
>>>> Am 16.08.2015 um 19:56 schrieb Stephan Ewen <se...@apache.org>:
>>>>
>>>> If you are up for it, this would be a very nice addition to Flink, a
>>>> great contribution :-)
>>>>
>>>> On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> This should definitely be possible in Flink. Pretty much exactly like
>>>>> you describe it.
>>>>>
>>>>> You need a custom version of the HDFS sink with some logic when to
>>>>> roll over to a new file.
>>>>>
>>>>> You can also make the sink "exactly once" by integrating it with the
>>>>> checkpointing. For that, you would probably need to keep the current path
>>>>> and output stream offsets as of the last checkpoint, so you can resume from
>>>>> that offset and overwrite records to avoid duplicates. If that is not
>>>>> possible, you would probably buffer records between checkpoints and only
>>>>> write on checkpoints.
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hp...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Did anybody think of (mis-) using Flink streaming as an alternative
>>>>>> to Apache Flume just for ingesting data from Kafka (or other streaming
>>>>>> sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs
>>>>>> I assume it should be possible, but Is this a good idea to do?
>>>>>>
>>>>>> Flume basically is about consuming data from somewhere, peeking into
>>>>>> each record and then directing it to a specific directory/file in HDFS
>>>>>> reliably. I've seen there is a FlumeSink, but would it be possible to get
>>>>>> the same functionality with
>>>>>> Flink alone?
>>>>>>
>>>>>> I've skimmed through the documentation and found the option to split
>>>>>> the output by key and the possibility to add multiple sinks. As I
>>>>>> understand, Flink programs are generally static, so it would not be
>>>>>> possible to add/remove sinks at runtime?
>>>>>> So you would need to implement a custom sink directing the records to
>>>>>> different files based on a key (e.g. date)? Would it be difficult to
>>>>>> implement things like rolling outputs etc? Or better just use Flume?
>>>>>>
>>>>>> Best,
>>>>>> Hans-Peter
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Re: Flink to ingest from Kafka to HDFS?

Posted by Stephan Ewen <se...@apache.org>.
BTW: This is becoming a dev discussion, maybe should move to that list...

On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <se...@apache.org> wrote:

> Yes, one needs exactly a mechanism to seek the output stream back to the
> last checkpointed position, in order to overwrite duplicates.
>
> I think HDFS is not going to make this easy, there is basically no seek
> for write. Not sure how to solve this, other then writing to tmp files and
> copying upon success.
>
> Apache Flume must have solved this issue in some way, it may be a worth
> looking into how they solved it.
>
> On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <in...@ricobergmann.de>
> wrote:
>
>> My ideas for checkpointing:
>>
>> I think writing to the destination should not depend on the checkpoint
>> mechanism (otherwise the output would never be written to the destination
>> if checkpointing is disabled). Instead I would keep the offsets of written
>> and Checkpointed records. When recovering you would then somehow delete or
>> overwrite the records after that offset. (But I don't really know whether
>> this is as simple as I wrote it ;-) ).
>>
>> Regarding the rolling files I would suggest making the values of the
>> user-defined partitioning function part of the path or file name. Writing
>> records is then basically:
>> Extract the partition to write to, then add the record to a queue for
>> this partition. Each queue has an output format assigned to it. On flushing
>> the output file is opened, the content of the queue is written to it, and
>> then closed.
>>
>> Does this sound reasonable?
>>
>>
>>
>> Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <al...@apache.org>:
>>
>> Yes, this seems like a good approach. We should probably no reuse the
>> KeySelector for this but maybe a more use-case specific type of function
>> that can create a desired filename from an input object.
>>
>> This is only the first part, though. The hard bit would be implementing
>> rolling files and also integrating it with Flink's checkpointing mechanism.
>> For integration with checkpointing you could maybe use "staging-files": all
>> elements are put into a staging file. And then, when the notification about
>> a completed checkpoint is received the contents of this file would me moved
>> (or appended) to the actual destination.
>>
>> Do you have any Ideas about the rolling files/checkpointing?
>>
>> On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <in...@ricobergmann.de> wrote:
>>
>>> I'm thinking about implementing this.
>>>
>>> After looking into the flink code I would basically subclass
>>> FileOutputFormat in let's say KeyedFileOutputFormat, that gets an
>>> additional KeySelector object. The path in the file system is then appended
>>> by the string, the KeySelector returns.
>>>
>>> U think this is a good approach?
>>>
>>> Greets. Rico.
>>>
>>>
>>>
>>> Am 16.08.2015 um 19:56 schrieb Stephan Ewen <se...@apache.org>:
>>>
>>> If you are up for it, this would be a very nice addition to Flink, a
>>> great contribution :-)
>>>
>>> On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> This should definitely be possible in Flink. Pretty much exactly like
>>>> you describe it.
>>>>
>>>> You need a custom version of the HDFS sink with some logic when to roll
>>>> over to a new file.
>>>>
>>>> You can also make the sink "exactly once" by integrating it with the
>>>> checkpointing. For that, you would probably need to keep the current path
>>>> and output stream offsets as of the last checkpoint, so you can resume from
>>>> that offset and overwrite records to avoid duplicates. If that is not
>>>> possible, you would probably buffer records between checkpoints and only
>>>> write on checkpoints.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hp...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Did anybody think of (mis-) using Flink streaming as an alternative to
>>>>> Apache Flume just for ingesting data from Kafka (or other streaming
>>>>> sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs
>>>>> I assume it should be possible, but Is this a good idea to do?
>>>>>
>>>>> Flume basically is about consuming data from somewhere, peeking into
>>>>> each record and then directing it to a specific directory/file in HDFS
>>>>> reliably. I've seen there is a FlumeSink, but would it be possible to get
>>>>> the same functionality with
>>>>> Flink alone?
>>>>>
>>>>> I've skimmed through the documentation and found the option to split
>>>>> the output by key and the possibility to add multiple sinks. As I
>>>>> understand, Flink programs are generally static, so it would not be
>>>>> possible to add/remove sinks at runtime?
>>>>> So you would need to implement a custom sink directing the records to
>>>>> different files based on a key (e.g. date)? Would it be difficult to
>>>>> implement things like rolling outputs etc? Or better just use Flume?
>>>>>
>>>>> Best,
>>>>> Hans-Peter
>>>>>
>>>>>
>>>>>
>>>>
>>>
>

Re: Flink to ingest from Kafka to HDFS?

Posted by Stephan Ewen <se...@apache.org>.
Yes, one needs exactly a mechanism to seek the output stream back to the
last checkpointed position, in order to overwrite duplicates.

I think HDFS is not going to make this easy, there is basically no seek for
write. Not sure how to solve this, other then writing to tmp files and
copying upon success.

Apache Flume must have solved this issue in some way, it may be a worth
looking into how they solved it.

On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <in...@ricobergmann.de>
wrote:

> My ideas for checkpointing:
>
> I think writing to the destination should not depend on the checkpoint
> mechanism (otherwise the output would never be written to the destination
> if checkpointing is disabled). Instead I would keep the offsets of written
> and Checkpointed records. When recovering you would then somehow delete or
> overwrite the records after that offset. (But I don't really know whether
> this is as simple as I wrote it ;-) ).
>
> Regarding the rolling files I would suggest making the values of the
> user-defined partitioning function part of the path or file name. Writing
> records is then basically:
> Extract the partition to write to, then add the record to a queue for this
> partition. Each queue has an output format assigned to it. On flushing the
> output file is opened, the content of the queue is written to it, and then
> closed.
>
> Does this sound reasonable?
>
>
>
> Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <al...@apache.org>:
>
> Yes, this seems like a good approach. We should probably no reuse the
> KeySelector for this but maybe a more use-case specific type of function
> that can create a desired filename from an input object.
>
> This is only the first part, though. The hard bit would be implementing
> rolling files and also integrating it with Flink's checkpointing mechanism.
> For integration with checkpointing you could maybe use "staging-files": all
> elements are put into a staging file. And then, when the notification about
> a completed checkpoint is received the contents of this file would me moved
> (or appended) to the actual destination.
>
> Do you have any Ideas about the rolling files/checkpointing?
>
> On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <in...@ricobergmann.de> wrote:
>
>> I'm thinking about implementing this.
>>
>> After looking into the flink code I would basically subclass
>> FileOutputFormat in let's say KeyedFileOutputFormat, that gets an
>> additional KeySelector object. The path in the file system is then appended
>> by the string, the KeySelector returns.
>>
>> U think this is a good approach?
>>
>> Greets. Rico.
>>
>>
>>
>> Am 16.08.2015 um 19:56 schrieb Stephan Ewen <se...@apache.org>:
>>
>> If you are up for it, this would be a very nice addition to Flink, a
>> great contribution :-)
>>
>> On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> This should definitely be possible in Flink. Pretty much exactly like
>>> you describe it.
>>>
>>> You need a custom version of the HDFS sink with some logic when to roll
>>> over to a new file.
>>>
>>> You can also make the sink "exactly once" by integrating it with the
>>> checkpointing. For that, you would probably need to keep the current path
>>> and output stream offsets as of the last checkpoint, so you can resume from
>>> that offset and overwrite records to avoid duplicates. If that is not
>>> possible, you would probably buffer records between checkpoints and only
>>> write on checkpoints.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hp...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Did anybody think of (mis-) using Flink streaming as an alternative to
>>>> Apache Flume just for ingesting data from Kafka (or other streaming
>>>> sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs
>>>> I assume it should be possible, but Is this a good idea to do?
>>>>
>>>> Flume basically is about consuming data from somewhere, peeking into
>>>> each record and then directing it to a specific directory/file in HDFS
>>>> reliably. I've seen there is a FlumeSink, but would it be possible to get
>>>> the same functionality with
>>>> Flink alone?
>>>>
>>>> I've skimmed through the documentation and found the option to split
>>>> the output by key and the possibility to add multiple sinks. As I
>>>> understand, Flink programs are generally static, so it would not be
>>>> possible to add/remove sinks at runtime?
>>>> So you would need to implement a custom sink directing the records to
>>>> different files based on a key (e.g. date)? Would it be difficult to
>>>> implement things like rolling outputs etc? Or better just use Flume?
>>>>
>>>> Best,
>>>> Hans-Peter
>>>>
>>>>
>>>>
>>>
>>

Re: Flink to ingest from Kafka to HDFS?

Posted by Rico Bergmann <in...@ricobergmann.de>.
My ideas for checkpointing:

I think writing to the destination should not depend on the checkpoint mechanism (otherwise the output would never be written to the destination if checkpointing is disabled). Instead I would keep the offsets of written and Checkpointed records. When recovering you would then somehow delete or overwrite the records after that offset. (But I don't really know whether this is as simple as I wrote it ;-) ). 

Regarding the rolling files I would suggest making the values of the user-defined partitioning function part of the path or file name. Writing records is then basically:
Extract the partition to write to, then add the record to a queue for this partition. Each queue has an output format assigned to it. On flushing the output file is opened, the content of the queue is written to it, and then closed.

Does this sound reasonable?



> Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <al...@apache.org>:
> 
> Yes, this seems like a good approach. We should probably no reuse the KeySelector for this but maybe a more use-case specific type of function that can create a desired filename from an input object.
> 
> This is only the first part, though. The hard bit would be implementing rolling files and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing you could maybe use "staging-files": all elements are put into a staging file. And then, when the notification about a completed checkpoint is received the contents of this file would me moved (or appended) to the actual destination.
> 
> Do you have any Ideas about the rolling files/checkpointing?
> 
>> On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <in...@ricobergmann.de> wrote:
>> I'm thinking about implementing this. 
>> 
>> After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 
>> 
>> U think this is a good approach?
>> 
>> Greets. Rico. 
>> 
>> 
>> 
>>> Am 16.08.2015 um 19:56 schrieb Stephan Ewen <se...@apache.org>:
>>> 
>>> If you are up for it, this would be a very nice addition to Flink, a great contribution :-)
>>> 
>>>> On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote:
>>>> Hi!
>>>> 
>>>> This should definitely be possible in Flink. Pretty much exactly like you describe it.
>>>> 
>>>> You need a custom version of the HDFS sink with some logic when to roll over to a new file.
>>>> 
>>>> You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.
>>>> 
>>>> Greetings,
>>>> Stephan
>>>> 
>>>> 
>>>> 
>>>>> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hp...@gmail.com> wrote:
>>>>> Hi,
>>>>> 
>>>>> Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 
>>>>> 
>>>>> Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
>>>>> Flink alone?
>>>>> 
>>>>> I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
>>>>> So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?
>>>>> 
>>>>> Best, 
>>>>> Hans-Peter

Re: Flink to ingest from Kafka to HDFS?

Posted by Aljoscha Krettek <al...@apache.org>.
Yes, this seems like a good approach. We should probably no reuse the
KeySelector for this but maybe a more use-case specific type of function
that can create a desired filename from an input object.

This is only the first part, though. The hard bit would be implementing
rolling files and also integrating it with Flink's checkpointing mechanism.
For integration with checkpointing you could maybe use "staging-files": all
elements are put into a staging file. And then, when the notification about
a completed checkpoint is received the contents of this file would me moved
(or appended) to the actual destination.

Do you have any Ideas about the rolling files/checkpointing?

On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <in...@ricobergmann.de> wrote:

> I'm thinking about implementing this.
>
> After looking into the flink code I would basically subclass
> FileOutputFormat in let's say KeyedFileOutputFormat, that gets an
> additional KeySelector object. The path in the file system is then appended
> by the string, the KeySelector returns.
>
> U think this is a good approach?
>
> Greets. Rico.
>
>
>
> Am 16.08.2015 um 19:56 schrieb Stephan Ewen <se...@apache.org>:
>
> If you are up for it, this would be a very nice addition to Flink, a great
> contribution :-)
>
> On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> This should definitely be possible in Flink. Pretty much exactly like you
>> describe it.
>>
>> You need a custom version of the HDFS sink with some logic when to roll
>> over to a new file.
>>
>> You can also make the sink "exactly once" by integrating it with the
>> checkpointing. For that, you would probably need to keep the current path
>> and output stream offsets as of the last checkpoint, so you can resume from
>> that offset and overwrite records to avoid duplicates. If that is not
>> possible, you would probably buffer records between checkpoints and only
>> write on checkpoints.
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hp...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Did anybody think of (mis-) using Flink streaming as an alternative to
>>> Apache Flume just for ingesting data from Kafka (or other streaming
>>> sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs
>>> I assume it should be possible, but Is this a good idea to do?
>>>
>>> Flume basically is about consuming data from somewhere, peeking into
>>> each record and then directing it to a specific directory/file in HDFS
>>> reliably. I've seen there is a FlumeSink, but would it be possible to get
>>> the same functionality with
>>> Flink alone?
>>>
>>> I've skimmed through the documentation and found the option to split the
>>> output by key and the possibility to add multiple sinks. As I understand,
>>> Flink programs are generally static, so it would not be possible to
>>> add/remove sinks at runtime?
>>> So you would need to implement a custom sink directing the records to
>>> different files based on a key (e.g. date)? Would it be difficult to
>>> implement things like rolling outputs etc? Or better just use Flume?
>>>
>>> Best,
>>> Hans-Peter
>>>
>>>
>>>
>>
>

Re: Flink to ingest from Kafka to HDFS?

Posted by Rico Bergmann <in...@ricobergmann.de>.
I'm thinking about implementing this. 

After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 

U think this is a good approach?

Greets. Rico. 



> Am 16.08.2015 um 19:56 schrieb Stephan Ewen <se...@apache.org>:
> 
> If you are up for it, this would be a very nice addition to Flink, a great contribution :-)
> 
>> On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote:
>> Hi!
>> 
>> This should definitely be possible in Flink. Pretty much exactly like you describe it.
>> 
>> You need a custom version of the HDFS sink with some logic when to roll over to a new file.
>> 
>> You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>>> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hp...@gmail.com> wrote:
>>> Hi,
>>> 
>>> Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 
>>> 
>>> Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
>>> Flink alone?
>>> 
>>> I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
>>> So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?
>>> 
>>> Best, 
>>> Hans-Peter
> 

Re: Flink to ingest from Kafka to HDFS?

Posted by Stephan Ewen <se...@apache.org>.
If you are up for it, this would be a very nice addition to Flink, a great
contribution :-)

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> This should definitely be possible in Flink. Pretty much exactly like you
> describe it.
>
> You need a custom version of the HDFS sink with some logic when to roll
> over to a new file.
>
> You can also make the sink "exactly once" by integrating it with the
> checkpointing. For that, you would probably need to keep the current path
> and output stream offsets as of the last checkpoint, so you can resume from
> that offset and overwrite records to avoid duplicates. If that is not
> possible, you would probably buffer records between checkpoints and only
> write on checkpoints.
>
> Greetings,
> Stephan
>
>
>
> On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hp...@gmail.com> wrote:
>
>> Hi,
>>
>> Did anybody think of (mis-) using Flink streaming as an alternative to
>> Apache Flume just for ingesting data from Kafka (or other streaming
>> sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs
>> I assume it should be possible, but Is this a good idea to do?
>>
>> Flume basically is about consuming data from somewhere, peeking into each
>> record and then directing it to a specific directory/file in HDFS reliably.
>> I've seen there is a FlumeSink, but would it be possible to get the same
>> functionality with
>> Flink alone?
>>
>> I've skimmed through the documentation and found the option to split the
>> output by key and the possibility to add multiple sinks. As I understand,
>> Flink programs are generally static, so it would not be possible to
>> add/remove sinks at runtime?
>> So you would need to implement a custom sink directing the records to
>> different files based on a key (e.g. date)? Would it be difficult to
>> implement things like rolling outputs etc? Or better just use Flume?
>>
>> Best,
>> Hans-Peter
>>
>>
>>
>

Re: Flink to ingest from Kafka to HDFS?

Posted by Stephan Ewen <se...@apache.org>.
Hi!

This should definitely be possible in Flink. Pretty much exactly like you
describe it.

You need a custom version of the HDFS sink with some logic when to roll
over to a new file.

You can also make the sink "exactly once" by integrating it with the
checkpointing. For that, you would probably need to keep the current path
and output stream offsets as of the last checkpoint, so you can resume from
that offset and overwrite records to avoid duplicates. If that is not
possible, you would probably buffer records between checkpoints and only
write on checkpoints.

Greetings,
Stephan



On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <hp...@gmail.com> wrote:

> Hi,
>
> Did anybody think of (mis-) using Flink streaming as an alternative to
> Apache Flume just for ingesting data from Kafka (or other streaming
> sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs
> I assume it should be possible, but Is this a good idea to do?
>
> Flume basically is about consuming data from somewhere, peeking into each
> record and then directing it to a specific directory/file in HDFS reliably.
> I've seen there is a FlumeSink, but would it be possible to get the same
> functionality with
> Flink alone?
>
> I've skimmed through the documentation and found the option to split the
> output by key and the possibility to add multiple sinks. As I understand,
> Flink programs are generally static, so it would not be possible to
> add/remove sinks at runtime?
> So you would need to implement a custom sink directing the records to
> different files based on a key (e.g. date)? Would it be difficult to
> implement things like rolling outputs etc? Or better just use Flume?
>
> Best,
> Hans-Peter
>
>
>