You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shikhar <sh...@schmizz.net> on 2016/03/04 03:08:43 UTC
Periodic actions
I am trying to have my job also run a periodic action by using a custom
source that emits a dummy element periodically and a sink that executes the
callback, as shown in the code below. However as soon as I start the job and
check the state in the JobManager UI this particular Sink->Source combo is
in state 'FINISHED' I know based on logging that the sink never received any
elements. What am I doing wrong?
```scala
env
.addSource(PeriodicSource(1.minutes))
.addSink { _ => foo() }
```
```scala
import org.apache.flink.streaming.api.functions.source.SourceFunction
import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import scala.concurrent.duration.FiniteDuration
case class PeriodicSource(interval: FiniteDuration) extends
SourceFunction[Unit] {
@volatile private var active = false
override def run(ctx: SourceContext[Unit]): Unit = {
while (active) {
sleep()
if (active) {
ctx.getCheckpointLock
ctx.collect(Unit)
}
}
}
override def cancel(): Unit = {
active = false
}
private def sleep(): Unit = {
val startTimeMs = System.currentTimeMillis()
val desiredSleepMs = interval.toMillis
do {
Thread.sleep(math.min(desiredSleepMs, 100))
} while (active && (System.currentTimeMillis() - startTimeMs) <
desiredSleepMs)
}
}
```
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Periodic-actions-tp5290.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Periodic actions
Posted by shikhar <sh...@schmizz.net>.
Wow that's embarassing :D That was indeed the issue
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Periodic-actions-tp5290p5304.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Periodic actions
Posted by Chesnay Schepler <ch...@apache.org>.
could the problem be as simple as var active being never true?
On 04.03.2016 03:08, shikhar wrote:
> I am trying to have my job also run a periodic action by using a custom
> source that emits a dummy element periodically and a sink that executes the
> callback, as shown in the code below. However as soon as I start the job and
> check the state in the JobManager UI this particular Sink->Source combo is
> in state 'FINISHED' I know based on logging that the sink never received any
> elements. What am I doing wrong?
>
> ```scala
> env
> .addSource(PeriodicSource(1.minutes))
> .addSink { _ => foo() }
> ```
>
> ```scala
> import org.apache.flink.streaming.api.functions.source.SourceFunction
> import
> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
>
> import scala.concurrent.duration.FiniteDuration
>
> case class PeriodicSource(interval: FiniteDuration) extends
> SourceFunction[Unit] {
> @volatile private var active = false
>
> override def run(ctx: SourceContext[Unit]): Unit = {
> while (active) {
> sleep()
> if (active) {
> ctx.getCheckpointLock
> ctx.collect(Unit)
> }
> }
> }
>
> override def cancel(): Unit = {
> active = false
> }
>
> private def sleep(): Unit = {
> val startTimeMs = System.currentTimeMillis()
> val desiredSleepMs = interval.toMillis
> do {
> Thread.sleep(math.min(desiredSleepMs, 100))
> } while (active && (System.currentTimeMillis() - startTimeMs) <
> desiredSleepMs)
> }
> }
> ```
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Periodic-actions-tp5290.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>