You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Sharma <vi...@grab.com> on 2019/05/28 02:26:33 UTC

[External] Flink StreamingFileSink not ingesting to S3 when checkpointing is disabled

Hello everyone,

I want to use aws s3 as sink for a data stream in flink. I am using
StreamingFileSink class to create a sink.

I don't need checkpointing for my job, but when I disable checkpointing,
data is no longer written to S3.

case 1 : checkpointing enabled
When checkpointing is enabled, the data is successfully ingested to the
mentioned s3 path.

case 2 : checkpointing disabled
When checkpointing is disabled, the data is not written to s3.

I tried executing the job multiple times, but every time I got the same
result. I am facing this on local machine as well as on kubernetes cluster.


Following is a code I tried having bounded stream -

object FlinkTestJob {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // with checkpointing enabled
    env.enableCheckpointing(100)

    // Sinks
    val streamStrings: Seq[String] =
      Seq("test1", "test2", "test3", "test4", "test5", "test6",
"test7", "test8", "test9", "test10")

    val testStream = env.fromCollection(streamStrings)

    val rollingPolicy = new RollingPolicy[String, String] {

      override def shouldRollOnCheckpoint(partFileState:
PartFileInfo[String]): Boolean =
        partFileState.getSize > 1

      override def shouldRollOnEvent(
          partFileState: PartFileInfo[String],
          element: String): Boolean = true

      override def shouldRollOnProcessingTime(
          partFileState: PartFileInfo[String],
          currentTime: Long): Boolean = true
    }

    val sink: StreamingFileSink[String] = StreamingFileSink
      .forRowFormat(new Path("s3a://testbucket/sink"), new
SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(rollingPolicy)
      .build()

    testStream.addSink(sink)
    env.execute("test-job")
  }
}


When I write to s3 using "writeAsText("s3a://testbucket/sink")" instead of
StreamingFileSink, it works perfectly fine regardless of whether or not
checkpointing is enabled.

Flink version : 1.8.0
I want to understand the relation between checkpointing and
StreamingFileSink.

- Thanks

-- 
*_Grab is hiring. Learn more at _**https://grab.careers 
<https://grab.careers/>*


By communicating with Grab Inc and/or its 
subsidiaries, associate companies and jointly controlled entities (“Grab 
Group”), you are deemed to have consented to processing of your personal 
data as set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/ <https://grab.com/privacy/>


This email contains 
confidential information and is only for the intended recipient(s). If you 
are not the intended recipient(s), please do not disseminate, distribute or 
copy this email and notify Grab Group immediately if you have received this 
by mistake and delete this email from your system. Email transmission 
cannot be guaranteed to be secure or error-free as any information therein 
could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or 
contain viruses. Grab Group do not accept liability for any errors or 
omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and 
attachments therein shall remain vested in Grab Group, unless otherwise 
provided by law.


Re: [External] Flink StreamingFileSink not ingesting to S3 when checkpointing is disabled

Posted by Timothy Victor <vi...@gmail.com>.
You must have checkpointing enabled to use the StreamingFileSink.  The
feature relies on CP for achieving exactly once semantics.

>>  This is integrated with the checkpointing mechanism to provide exactly
once semantics.

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.html


Tim

On Mon, May 27, 2019 at 9:27 PM Vishal Sharma <vi...@grab.com>
wrote:

> Hello everyone,
>
> I want to use aws s3 as sink for a data stream in flink. I am using
> StreamingFileSink class to create a sink.
>
> I don't need checkpointing for my job, but when I disable checkpointing,
> data is no longer written to S3.
>
> case 1 : checkpointing enabled
> When checkpointing is enabled, the data is successfully ingested to the
> mentioned s3 path.
>
> case 2 : checkpointing disabled
> When checkpointing is disabled, the data is not written to s3.
>
> I tried executing the job multiple times, but every time I got the same
> result. I am facing this on local machine as well as on kubernetes cluster.
>
>
> Following is a code I tried having bounded stream -
>
> object FlinkTestJob {
>
>   def main(args: Array[String]): Unit = {
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>     // with checkpointing enabled
>     env.enableCheckpointing(100)
>
>     // Sinks
>     val streamStrings: Seq[String] =
>       Seq("test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10")
>
>     val testStream = env.fromCollection(streamStrings)
>
>     val rollingPolicy = new RollingPolicy[String, String] {
>
>       override def shouldRollOnCheckpoint(partFileState: PartFileInfo[String]): Boolean =
>         partFileState.getSize > 1
>
>       override def shouldRollOnEvent(
>           partFileState: PartFileInfo[String],
>           element: String): Boolean = true
>
>       override def shouldRollOnProcessingTime(
>           partFileState: PartFileInfo[String],
>           currentTime: Long): Boolean = true
>     }
>
>     val sink: StreamingFileSink[String] = StreamingFileSink
>       .forRowFormat(new Path("s3a://testbucket/sink"), new SimpleStringEncoder[String]("UTF-8"))
>       .withRollingPolicy(rollingPolicy)
>       .build()
>
>     testStream.addSink(sink)
>     env.execute("test-job")
>   }
> }
>
>
> When I write to s3 using "writeAsText("s3a://testbucket/sink")" instead of
> StreamingFileSink, it works perfectly fine regardless of whether or not
> checkpointing is enabled.
>
> Flink version : 1.8.0
> I want to understand the relation between checkpointing and
> StreamingFileSink.
>
> - Thanks
>
> *Grab is hiring. Learn more at https://grab.careers
> <https://grab.careers/>*
>
> By communicating with Grab Inc and/or its subsidiaries, associate
> companies and jointly controlled entities (“Grab Group”), you are deemed to
> have consented to processing of your personal data as set out in the
> Privacy Notice which can be viewed at https://grab.com/privacy/
>
> This email contains confidential information and is only for the intended
> recipient(s). If you are not the intended recipient(s), please do not
> disseminate, distribute or copy this email and notify Grab Group
> immediately if you have received this by mistake and delete this email from
> your system. Email transmission cannot be guaranteed to be secure or
> error-free as any information therein could be intercepted, corrupted,
> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
> not accept liability for any errors or omissions in the contents of this
> email arises as a result of email transmission. All intellectual property
> rights in this email and attachments therein shall remain vested in Grab
> Group, unless otherwise provided by law.
>