You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Johan Rask <jo...@flapsdown.se> on 2019/11/17 12:42:58 UTC

Hdfs -> hdfs ends up with .progress files

Hi!

I am quite new to Apache Flink but we have been evaluating it for some
weeks to read from kafka and transform and write to hdfs. With kafka to
hdfs with exactly-once configured this works as expected but when we
replace the source with kafka to hdfs files hangs in .in-progress.

We first experienced this also with kafka but after properly configuring
checkpoints we got expected results.

We have simply changed kafka source to hdfs source and kept streaming mode
although this is obviously a bounded data set so there might be some issue
with this that we do not understand.

Any help with this is highly appreciated!

Regards /Johan Rask

I have created a small gist of our program here
https://gist.github.com/jrask/ef4a8531b0563f1420ce276e7b0f59ce

And I also copy of the gist here.

 public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1),
CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new
FsStateBackend("hdfs://<server><dir>/checkpoints",true));
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);

        StreamingFileSink<JsonObject> hdfsSink = StreamingFileSink
                .<JsonObject>forRowFormat(new
Path("hdfs://<server>/<dir>"), new SimpleStringEncoder<>("UTF-8"))
                .withBucketAssigner(new
EventTimeDateTimeBuckerAssigner<>("'/year'=YYYY/'month'=MM/'day'=dd/'hour'=HH"))
                .build();

        env.readTextFile("hdfs://<server><dir><file>")
            .map(Parser::parse)
            .addSink(hdfsSink);

        env.execute("some-pipeline");
    }


hdfs -> hdfs results in the following. However if I use kafka as
source, it works properly.

rw-rw----+  3 someuser supergroup   87792789 2019-11-16 20:57
/data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-62.inprogress.8f9c6104-4c6c-4eee-8650-dd5d1d12d668
-rw-rw----+  3 someuser supergroup   64696413 2019-11-16 20:58
/data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-63.inprogress.42589a04-601b-496d-ae20-7db1d56089dc

... rest is removed for clarity