You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fritz Budiyanto <fb...@icloud.com> on 2017/10/18 02:55:02 UTC

Parallelism, registerEventTimeTimer and watermark problem

Hi All,

If I have high parallelism and use processFunction to registerEventTimeTimer, the timer never gets fired.
After debugging, I found out the watermark isn't updated because I have keyBy right after assignTimestampsAndWatermarks.
And if I set assignTimestampsAndWatermarks right after the keyBy, an exception is thrown.

 val contractFlow = enrichedFlow
      .keyBy(f => f.fiveTupleKey)
      .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <<<<<
      .process(new FlowContractStitcherProcess)
      .name("contractStitcher")

at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)


Any idea how to solve my problem ? How do I update the watermark after keyBy ?

Would I hit scaling issue if on large number of timer if I use registerProcessingTimeTimer instead ? I'm using event time throughout the pipeline, would mixing processing timer with event time might cause problem down the line ?

--
Fritz

Re: Parallelism, registerEventTimeTimer and watermark problem

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Fritz,

If the watermark is not updating this usually means that one of the input partitions (if you're using Kafka) is not carrying data. In that case, the watermark/timestamp assigner will have no data on which to base an updated watermark. For such use cases I recently implemented a special watermark/timestamp assigner that will notice if a stream is idle and will then artificially advance the watermark. The code for this is available here: https://github.com/aljoscha/flink/commit/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f <https://github.com/aljoscha/flink/commit/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f>

Does this apply to your case?

Best,
Aljoscha

P.S. The exception is thrown because using timers/state is only allowed in an operation that directly follows a keyBy().

> On 18. Oct 2017, at 05:23, Fritz Budiyanto <fb...@icloud.com> wrote:
> 
> Sorry, missing copy paste for the exception thrown:
> 
> 10/17/2017 20:21:30	dropDetection -> (aggFlowDropDetectPrintln -> Sink: Unnamed, aggFlowDropDetectPrintln -> Sink: Unnamed, Sink: kafkaSink)(3/4) switched to CANCELED 
> 20:21:30,244 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Aggregate flows (313a46d5fd23e4c2d0d00d0033950b6d) switched from state FAILING to FAILED.
> java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
> 	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
> 	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:151)
> 	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:115)
> 	at FlowContractStitcherProcess.endState$lzycompute(FlowContractResolver.scala:30)
> 	at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
> 	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
> 	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
> 	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> 	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> 
> --
> Fritz
> 
> 
>> On Oct 17, 2017, at 7:55 PM, Fritz Budiyanto <fb...@icloud.com> wrote:
>> 
>> Hi All,
>> 
>> If I have high parallelism and use processFunction to registerEventTimeTimer, the timer never gets fired.
>> After debugging, I found out the watermark isn't updated because I have keyBy right after assignTimestampsAndWatermarks.
>> And if I set assignTimestampsAndWatermarks right after the keyBy, an exception is thrown.
>> 
>> val contractFlow = enrichedFlow
>>     .keyBy(f => f.fiveTupleKey)
>>     .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <<<<<
>>     .process(new FlowContractStitcherProcess)
>>     .name("contractStitcher")
>> 
>> at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
>> 	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
>> 	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
>> 	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>> 	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> 	at java.lang.Thread.run(Thread.java:745)
>> 
>> 
>> Any idea how to solve my problem ? How do I update the watermark after keyBy ?
>> 
>> Would I hit scaling issue if on large number of timer if I use registerProcessingTimeTimer instead ? I'm using event time throughout the pipeline, would mixing processing timer with event time might cause problem down the line ?
>> 
>> --
>> Fritz
> 


Re: Parallelism, registerEventTimeTimer and watermark problem

Posted by Fritz Budiyanto <fb...@icloud.com>.
Sorry, missing copy paste for the exception thrown:

10/17/2017 20:21:30	dropDetection -> (aggFlowDropDetectPrintln -> Sink: Unnamed, aggFlowDropDetectPrintln -> Sink: Unnamed, Sink: kafkaSink)(3/4) switched to CANCELED 
20:21:30,244 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Aggregate flows (313a46d5fd23e4c2d0d00d0033950b6d) switched from state FAILING to FAILED.
java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:151)
	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:115)
	at FlowContractStitcherProcess.endState$lzycompute(FlowContractResolver.scala:30)
	at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)


--
Fritz


> On Oct 17, 2017, at 7:55 PM, Fritz Budiyanto <fb...@icloud.com> wrote:
> 
> Hi All,
> 
> If I have high parallelism and use processFunction to registerEventTimeTimer, the timer never gets fired.
> After debugging, I found out the watermark isn't updated because I have keyBy right after assignTimestampsAndWatermarks.
> And if I set assignTimestampsAndWatermarks right after the keyBy, an exception is thrown.
> 
> val contractFlow = enrichedFlow
>      .keyBy(f => f.fiveTupleKey)
>      .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <<<<<
>      .process(new FlowContractStitcherProcess)
>      .name("contractStitcher")
> 
> at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
> 	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
> 	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
> 	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> 	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> 
> Any idea how to solve my problem ? How do I update the watermark after keyBy ?
> 
> Would I hit scaling issue if on large number of timer if I use registerProcessingTimeTimer instead ? I'm using event time throughout the pipeline, would mixing processing timer with event time might cause problem down the line ?
> 
> --
> Fritz