You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Josh Lemer <jo...@gmail.com> on 2018/04/04 18:50:32 UTC

Bucketing Sink does not complete files, when source is from a collection

Hello, I was wondering if I could get some pointers on what I'm doing wrong
here. I posted this on stack overflow
<https://stackoverflow.com/questions/49655460/flink-does-not-checkpoint-and-bucketingsink-leaves-files-in-pending-state-when>,
but I thought I'd also ask here.

I'm trying to generate some test data using a collection, and write that
data to s3, Flink doesn't seem to do any checkpointing at all when I do
this, but it does do checkpointing when the source comes from s3.

For example, this DOES checkpoint and leaves output files in a completed
state:

```scala

  val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment

  env.setMaxParallelism(128)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  env.enableCheckpointing(2000L)

  env.setStateBackend(new
RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))


  val lines: DataStream[String] = {

    val path = "s3a://my_bucket/simple_job/in"

    env

      .readFile(

        inputFormat = new TextInputFormat(new Path(path)),

        filePath = path,

        watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,

        interval = 5000L

      )

  }


  val sinkFunction: BucketingSink[String] =

    new BucketingSink[String]("s3a://my_bucket/simple_job/out")

      .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))


  lines.addSink(sinkFunction)


  env.execute()
```

Meanwhile, this DOES NOT checkpoint, and leaves files in a .pending state
even after the job has finished:


```scala

  val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment

  env.setMaxParallelism(128)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  env.enableCheckpointing(2000L)

  env.setStateBackend(new
RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))


  val lines: DataStream[String] = env.fromCollection((1 to
100).map(_.toString))


  val sinkFunction: BucketingSink[String] =

    new BucketingSink[String]("s3a://my_bucket/simple_job/out")

      .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))


  lines.addSink(sinkFunction)


  env.execute()
```
Is this a bug in flink or something I'm doing wrong? Thank you!

Re: Bucketing Sink does not complete files, when source is from a collection

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Josh,

You are right, FLINK-2646 is related to the problem of non-finialized
files. If we could distinguish the cases why close() is called, we could do
a proper clean-up if the job terminated because all data was processed.

Right now, the source and sink interfaces of the DataStream API are not
really designed for finite / bounded data.
In order to improve the support for bounded and unbounded data, we have
some plans to design unified interfaces that can handle both cases well.
This effort should also solve cases like the one that you described.

Best, Fabian

2018-04-04 21:51 GMT+02:00 joshlemer <jo...@gmail.com>:

> Actually sorry, I have found that this is most likely a manifestation of
> https://issues.apache.org/jira/browse/FLINK-2646 as discussed elsewhere on
> the mailing list. That is, in the second example "fromCollection" the
> entire
> stream ends before a checkpoint is made. Let's hope this is fixed some day
> :-)
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Bucketing Sink does not complete files, when source is from a collection

Posted by joshlemer <jo...@gmail.com>.
Actually sorry, I have found that this is most likely a manifestation of
https://issues.apache.org/jira/browse/FLINK-2646 as discussed elsewhere on
the mailing list. That is, in the second example "fromCollection" the entire
stream ends before a checkpoint is made. Let's hope this is fixed some day
:-)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/