You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Si-li Liu <un...@gmail.com> on 2020/07/02 08:51:48 UTC

Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor.
The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(
TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor,
ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.
StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:
1100)
    at org.apache.flink.streaming.runtime.tasks.
StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.
ClassCastException: org.apache.flink.streaming.api.windowing.windows.
TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(
FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer
.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.
StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException:
org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast
to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(
VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot
.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot
.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(
FutureUtils.java:447)
    ... 5 more

-- 
Best regards

Sili Liu

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Posted by Si-li Liu <un...@gmail.com>.
Someone told me that maybe this issue is Mesos specific. I'm kind of a
newbie in Flink, and I digged into the code but can not get a conclusion.
Here I just wanna have a better JoinWindow that emits the result and delete
it from the window state immediately when joined successfully, is there any
other way? Thanks!

Congxian Qiu <qc...@gmail.com> 于2020年7月11日周六 下午3:20写道:

> Hi Si-li
>
> Thanks for the notice.
> I just want to double-check is the original problem has been solved?  As I
> found that the created issue FLINK-18464 has been closed with reason "can
> not reproduce". Am I missing something here?
>
> Best,
> Congxian
>
>
> Si-li Liu <un...@gmail.com> 于2020年7月10日周五 下午6:06写道:
>
>> Sorry
>>
>> I can't reproduce it with reduce/aggregate/fold/apply and due to some
>> limitations in my working environment, I can't use flink 1.10 or 1.11.
>>
>> Congxian Qiu <qc...@gmail.com> 于2020年7月5日周日 下午6:21写道:
>>
>>> Hi
>>>
>>> First, Could you please try this problem still there if use flink 1.10
>>> or 1.11?
>>>
>>> It seems strange, from the error message, here is an error when trying
>>> to convert a non-Window state(VoidNameSpace) to a Window State (serializer
>>> is the serializer of Window state, but the state is non-Window state).
>>> Could you please try to replace the MyFuction with a reduce/aggregate/fold/apply()
>>> function to see what happens? -- this wants to narrow down the problem.
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Si-li Liu <un...@gmail.com> 于2020年7月3日周五 下午6:44写道:
>>>
>>>> Thanks for your help
>>>>
>>>> 1. I started the job from scratch, not a savepoint or externalized
>>>> checkpoint
>>>> 2. No job graph change
>>>> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>> 4. My Flink version is 1.9.1
>>>>
>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月3日周五 下午4:49写道:
>>>>
>>>>> I still wasn't able to reproduce the issue.
>>>>>
>>>>> Can you also clarify:
>>>>> - Are you starting the job from a savepoint or externalized
>>>>> checkpoint?
>>>>> - If yes, was the job graph changed?
>>>>> - What StreamTimeCharacteristic is set, if any?
>>>>> - What exact version of Flink do you use?
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <un...@gmail.com> wrote:
>>>>>
>>>>>> Hi, Thanks for your help.
>>>>>>
>>>>>> The checkpoint configuration is
>>>>>>
>>>>>> checkpoint.intervalMS=300000
>>>>>> checkpoint.timeoutMS=300000
>>>>>>
>>>>>> The error callstack is from JM's log, which happened in every cp.
>>>>>> Currently I don't have a success cp yet.
>>>>>>
>>>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月3日周五
>>>>>> 上午3:50写道:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Thanks for the details.
>>>>>>> However, I was not able to reproduce the issue. I used parallelism
>>>>>>> levels 4, file system backend and tried different timings for
>>>>>>> checkpointing, windowing and source.
>>>>>>> Do you encounter this problem deterministically, is it always 1st
>>>>>>> checkpoint?
>>>>>>> What checkpointing interval do you use?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Roman
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <un...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi, this is our production code so I have to modify it a little
>>>>>>>> bit, such as variable name and function name. I think 3 classes I provide
>>>>>>>> here is enough.
>>>>>>>>
>>>>>>>> I try to join two streams, but I don't want to use the default join
>>>>>>>> function, because I want to send the joined log immediately and remove it
>>>>>>>> from window state immediately. And my window gap time is very long( 20
>>>>>>>> minutes), so it maybe evaluate it multiple times.
>>>>>>>>
>>>>>>>> class JoinFunction extends
>>>>>>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>>>>>>
>>>>>>>>   var ueState: ValueState[RawLog] = _
>>>>>>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>>>>>>   val invalidCounter = new LongCounter()
>>>>>>>>   val processCounter = new LongCounter()
>>>>>>>>   val sendToKafkaCounter = new LongCounter()
>>>>>>>>
>>>>>>>>   override def open(parameters: Configuration): Unit = {
>>>>>>>>     ueState = getRuntimeContext.getState(
>>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>>     )
>>>>>>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>>>>>>     getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>>>>>>>>     getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>>>>>>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   override def process(key: String,
>>>>>>>>                        ctx: Context,
>>>>>>>>                        logs: Iterable[RawLog],
>>>>>>>>                        out: Collector[OutputLog]): Unit = {
>>>>>>>>     if (ueState.value() != null) {
>>>>>>>>       processCounter.add(1L)
>>>>>>>>       val bid = ueState.value()
>>>>>>>>       val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
>>>>>>>>       logs.foreach( log => {
>>>>>>>>         if (log.eventType == SHOW) {
>>>>>>>>           val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>>>>>>>>           sendToKafkaCounter.add(1L)
>>>>>>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
>>>>>>>>         }
>>>>>>>>       })
>>>>>>>>     } else {
>>>>>>>>       invalidCounter.add(1L)
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>>>>>>
>>>>>>>>   override def onElement(log: RawLog,
>>>>>>>>                          timestamp: Long,
>>>>>>>>                          window: TimeWindow,
>>>>>>>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>>     )
>>>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>>>
>>>>>>>>     if (!firstSeen.value()) {
>>>>>>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>>>>>>       firstSeen.update(true)
>>>>>>>>     }
>>>>>>>>     val eventType = log.eventType
>>>>>>>>     if (eventType == BID) {
>>>>>>>>       ueState.update(log)
>>>>>>>>       TriggerResult.CONTINUE
>>>>>>>>     } else {
>>>>>>>>       if (ueState.value() == null) {
>>>>>>>>         TriggerResult.CONTINUE
>>>>>>>>       } else {
>>>>>>>>         TriggerResult.FIRE
>>>>>>>>       }
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   override def onEventTime(timestamp: Long,
>>>>>>>>                            window: TimeWindow,
>>>>>>>>                            ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>>>>     if (timestamp == window.getEnd) {
>>>>>>>>       TriggerResult.PURGE
>>>>>>>>     }
>>>>>>>>     TriggerResult.CONTINUE
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   override def onProcessingTime(timestamp: Long,
>>>>>>>>                                 window: TimeWindow,
>>>>>>>>                                 ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>>>>     TriggerResult.CONTINUE
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   override def clear(window: TimeWindow,
>>>>>>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>>     )
>>>>>>>>     ueState.clear()
>>>>>>>>
>>>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>>>     firstSeen.clear()
>>>>>>>>
>>>>>>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>>>>>>
>>>>>>>>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>>>>>>>>                            size: Int,
>>>>>>>>                            window: TimeWindow,
>>>>>>>>                            evictorContext: Evictor.EvictorContext): Unit = {}
>>>>>>>>
>>>>>>>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>>>>>>>                            size: Int,
>>>>>>>>                            window: TimeWindow,
>>>>>>>>                            evictorContext: Evictor.EvictorContext): Unit = {
>>>>>>>>     val iter = elements.iterator()
>>>>>>>>     while (iter.hasNext) {
>>>>>>>>       iter.next()
>>>>>>>>       iter.remove()
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四
>>>>>>>> 下午7:18写道:
>>>>>>>>
>>>>>>>>> Thanks for the clarification.
>>>>>>>>>
>>>>>>>>> Can you also share the code of other parts, particularly
>>>>>>>>> MyFunction?
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Roman
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <un...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Rocksdb backend has the same problem
>>>>>>>>>>
>>>>>>>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四
>>>>>>>>>> 下午6:11写道:
>>>>>>>>>>
>>>>>>>>>>> Thanks for reporting this.
>>>>>>>>>>>
>>>>>>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>>>>>>> state entry.
>>>>>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>>>>>>>> further investigate it.
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Roman
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <un...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger
>>>>>>>>>>>> and evictor. The state is stored to memory.
>>>>>>>>>>>>
>>>>>>>>>>>> input.setParallelism(processParallelism)
>>>>>>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>>>>>>         .keyBy(_.key)
>>>>>>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>>>>>>         .trigger(new MyTrigger)
>>>>>>>>>>>>         .evictor(new MyEvictor)
>>>>>>>>>>>>         .process(new
>>>>>>>>>>>> MyFunction).setParallelism(aggregateParallelism)
>>>>>>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>>>>>>         .name("kafka-record-sink")
>>>>>>>>>>>>
>>>>>>>>>>>> And the exception stack is here, could anyone help with this?
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>>>>>>> kafka-record-sink (2/5).
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>>>>>>> StreamTask.java:1100)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>>>>>>> ClassCastException:
>>>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:
>>>>>>>>>>>> 122)
>>>>>>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer
>>>>>>>>>>>> .java:47)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>>>>>>     ... 3 more
>>>>>>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> CopyOnWriteStateMapSnapshot.writeState(
>>>>>>>>>>>> CopyOnWriteStateMapSnapshot.java:114)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>>>> AbstractStateTableSnapshot.java:121)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:
>>>>>>>>>>>> 191)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:
>>>>>>>>>>>> 158)
>>>>>>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable
>>>>>>>>>>>> .call(AsyncSnapshotCallable.java:75)
>>>>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>>>>>>     ... 5 more
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best regards
>>>>>>>>>>>>
>>>>>>>>>>>> Sili Liu
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards
>>>>>>>>>>
>>>>>>>>>> Sili Liu
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards
>>>>>>>>
>>>>>>>> Sili Liu
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Posted by Congxian Qiu <qc...@gmail.com>.
Hi Si-li

Thanks for the notice.
I just want to double-check is the original problem has been solved?  As I
found that the created issue FLINK-18464 has been closed with reason "can
not reproduce". Am I missing something here?

Best,
Congxian


Si-li Liu <un...@gmail.com> 于2020年7月10日周五 下午6:06写道:

> Sorry
>
> I can't reproduce it with reduce/aggregate/fold/apply and due to some
> limitations in my working environment, I can't use flink 1.10 or 1.11.
>
> Congxian Qiu <qc...@gmail.com> 于2020年7月5日周日 下午6:21写道:
>
>> Hi
>>
>> First, Could you please try this problem still there if use flink 1.10 or
>> 1.11?
>>
>> It seems strange, from the error message, here is an error when trying to
>> convert a non-Window state(VoidNameSpace) to a Window State (serializer is
>> the serializer of Window state, but the state is non-Window state).
>> Could you please try to replace the MyFuction with a reduce/aggregate/fold/apply()
>> function to see what happens? -- this wants to narrow down the problem.
>>
>> Best,
>> Congxian
>>
>>
>> Si-li Liu <un...@gmail.com> 于2020年7月3日周五 下午6:44写道:
>>
>>> Thanks for your help
>>>
>>> 1. I started the job from scratch, not a savepoint or externalized
>>> checkpoint
>>> 2. No job graph change
>>> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> 4. My Flink version is 1.9.1
>>>
>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月3日周五 下午4:49写道:
>>>
>>>> I still wasn't able to reproduce the issue.
>>>>
>>>> Can you also clarify:
>>>> - Are you starting the job from a savepoint or externalized checkpoint?
>>>> - If yes, was the job graph changed?
>>>> - What StreamTimeCharacteristic is set, if any?
>>>> - What exact version of Flink do you use?
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <un...@gmail.com> wrote:
>>>>
>>>>> Hi, Thanks for your help.
>>>>>
>>>>> The checkpoint configuration is
>>>>>
>>>>> checkpoint.intervalMS=300000
>>>>> checkpoint.timeoutMS=300000
>>>>>
>>>>> The error callstack is from JM's log, which happened in every cp.
>>>>> Currently I don't have a success cp yet.
>>>>>
>>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月3日周五 上午3:50写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thanks for the details.
>>>>>> However, I was not able to reproduce the issue. I used parallelism
>>>>>> levels 4, file system backend and tried different timings for
>>>>>> checkpointing, windowing and source.
>>>>>> Do you encounter this problem deterministically, is it always 1st
>>>>>> checkpoint?
>>>>>> What checkpointing interval do you use?
>>>>>>
>>>>>> Regards,
>>>>>> Roman
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <un...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi, this is our production code so I have to modify it a little bit,
>>>>>>> such as variable name and function name. I think 3 classes I provide here
>>>>>>> is enough.
>>>>>>>
>>>>>>> I try to join two streams, but I don't want to use the default join
>>>>>>> function, because I want to send the joined log immediately and remove it
>>>>>>> from window state immediately. And my window gap time is very long( 20
>>>>>>> minutes), so it maybe evaluate it multiple times.
>>>>>>>
>>>>>>> class JoinFunction extends
>>>>>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>>>>>
>>>>>>>   var ueState: ValueState[RawLog] = _
>>>>>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>>>>>   val invalidCounter = new LongCounter()
>>>>>>>   val processCounter = new LongCounter()
>>>>>>>   val sendToKafkaCounter = new LongCounter()
>>>>>>>
>>>>>>>   override def open(parameters: Configuration): Unit = {
>>>>>>>     ueState = getRuntimeContext.getState(
>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>     )
>>>>>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>>>>>     getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>>>>>>>     getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>>>>>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
>>>>>>>   }
>>>>>>>
>>>>>>>   override def process(key: String,
>>>>>>>                        ctx: Context,
>>>>>>>                        logs: Iterable[RawLog],
>>>>>>>                        out: Collector[OutputLog]): Unit = {
>>>>>>>     if (ueState.value() != null) {
>>>>>>>       processCounter.add(1L)
>>>>>>>       val bid = ueState.value()
>>>>>>>       val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
>>>>>>>       logs.foreach( log => {
>>>>>>>         if (log.eventType == SHOW) {
>>>>>>>           val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>>>>>>>           sendToKafkaCounter.add(1L)
>>>>>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
>>>>>>>         }
>>>>>>>       })
>>>>>>>     } else {
>>>>>>>       invalidCounter.add(1L)
>>>>>>>     }
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>>>>>
>>>>>>>   override def onElement(log: RawLog,
>>>>>>>                          timestamp: Long,
>>>>>>>                          window: TimeWindow,
>>>>>>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>     )
>>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>>
>>>>>>>     if (!firstSeen.value()) {
>>>>>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>>>>>       firstSeen.update(true)
>>>>>>>     }
>>>>>>>     val eventType = log.eventType
>>>>>>>     if (eventType == BID) {
>>>>>>>       ueState.update(log)
>>>>>>>       TriggerResult.CONTINUE
>>>>>>>     } else {
>>>>>>>       if (ueState.value() == null) {
>>>>>>>         TriggerResult.CONTINUE
>>>>>>>       } else {
>>>>>>>         TriggerResult.FIRE
>>>>>>>       }
>>>>>>>     }
>>>>>>>   }
>>>>>>>
>>>>>>>   override def onEventTime(timestamp: Long,
>>>>>>>                            window: TimeWindow,
>>>>>>>                            ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>>>     if (timestamp == window.getEnd) {
>>>>>>>       TriggerResult.PURGE
>>>>>>>     }
>>>>>>>     TriggerResult.CONTINUE
>>>>>>>   }
>>>>>>>
>>>>>>>   override def onProcessingTime(timestamp: Long,
>>>>>>>                                 window: TimeWindow,
>>>>>>>                                 ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>>>     TriggerResult.CONTINUE
>>>>>>>   }
>>>>>>>
>>>>>>>   override def clear(window: TimeWindow,
>>>>>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>>     )
>>>>>>>     ueState.clear()
>>>>>>>
>>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>>     firstSeen.clear()
>>>>>>>
>>>>>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>>>>>
>>>>>>>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>>>>>>>                            size: Int,
>>>>>>>                            window: TimeWindow,
>>>>>>>                            evictorContext: Evictor.EvictorContext): Unit = {}
>>>>>>>
>>>>>>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>>>>>>                            size: Int,
>>>>>>>                            window: TimeWindow,
>>>>>>>                            evictorContext: Evictor.EvictorContext): Unit = {
>>>>>>>     val iter = elements.iterator()
>>>>>>>     while (iter.hasNext) {
>>>>>>>       iter.next()
>>>>>>>       iter.remove()
>>>>>>>     }
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四
>>>>>>> 下午7:18写道:
>>>>>>>
>>>>>>>> Thanks for the clarification.
>>>>>>>>
>>>>>>>> Can you also share the code of other parts, particularly MyFunction?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Roman
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <un...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Rocksdb backend has the same problem
>>>>>>>>>
>>>>>>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四
>>>>>>>>> 下午6:11写道:
>>>>>>>>>
>>>>>>>>>> Thanks for reporting this.
>>>>>>>>>>
>>>>>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>>>>>> state entry.
>>>>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>>>>>>> further investigate it.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Roman
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <un...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>>>>>>>> evictor. The state is stored to memory.
>>>>>>>>>>>
>>>>>>>>>>> input.setParallelism(processParallelism)
>>>>>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>>>>>         .keyBy(_.key)
>>>>>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>>>>>         .trigger(new MyTrigger)
>>>>>>>>>>>         .evictor(new MyEvictor)
>>>>>>>>>>>         .process(new
>>>>>>>>>>> MyFunction).setParallelism(aggregateParallelism)
>>>>>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>>>>>         .name("kafka-record-sink")
>>>>>>>>>>>
>>>>>>>>>>> And the exception stack is here, could anyone help with this?
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>>>>>> kafka-record-sink (2/5).
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>>>>>> StreamTask.java:1100)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>>>>>> ClassCastException:
>>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:
>>>>>>>>>>> 122)
>>>>>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:
>>>>>>>>>>> 47)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>>>>>     ... 3 more
>>>>>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>> CopyOnWriteStateMapSnapshot.writeState(
>>>>>>>>>>> CopyOnWriteStateMapSnapshot.java:114)
>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>>> AbstractStateTableSnapshot.java:121)
>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:
>>>>>>>>>>> 191)
>>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>>> HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:
>>>>>>>>>>> 158)
>>>>>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable
>>>>>>>>>>> .call(AsyncSnapshotCallable.java:75)
>>>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>>>>>     ... 5 more
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best regards
>>>>>>>>>>>
>>>>>>>>>>> Sili Liu
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best regards
>>>>>>>>>
>>>>>>>>> Sili Liu
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards
>>>>>>>
>>>>>>> Sili Liu
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards
>>>>>
>>>>> Sili Liu
>>>>>
>>>>
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Posted by Si-li Liu <un...@gmail.com>.
Sorry

I can't reproduce it with reduce/aggregate/fold/apply and due to some
limitations in my working environment, I can't use flink 1.10 or 1.11.

Congxian Qiu <qc...@gmail.com> 于2020年7月5日周日 下午6:21写道:

> Hi
>
> First, Could you please try this problem still there if use flink 1.10 or
> 1.11?
>
> It seems strange, from the error message, here is an error when trying to
> convert a non-Window state(VoidNameSpace) to a Window State (serializer is
> the serializer of Window state, but the state is non-Window state).
> Could you please try to replace the MyFuction with a reduce/aggregate/fold/apply()
> function to see what happens? -- this wants to narrow down the problem.
>
> Best,
> Congxian
>
>
> Si-li Liu <un...@gmail.com> 于2020年7月3日周五 下午6:44写道:
>
>> Thanks for your help
>>
>> 1. I started the job from scratch, not a savepoint or externalized
>> checkpoint
>> 2. No job graph change
>> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> 4. My Flink version is 1.9.1
>>
>> Khachatryan Roman <kh...@gmail.com> 于2020年7月3日周五 下午4:49写道:
>>
>>> I still wasn't able to reproduce the issue.
>>>
>>> Can you also clarify:
>>> - Are you starting the job from a savepoint or externalized checkpoint?
>>> - If yes, was the job graph changed?
>>> - What StreamTimeCharacteristic is set, if any?
>>> - What exact version of Flink do you use?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <un...@gmail.com> wrote:
>>>
>>>> Hi, Thanks for your help.
>>>>
>>>> The checkpoint configuration is
>>>>
>>>> checkpoint.intervalMS=300000
>>>> checkpoint.timeoutMS=300000
>>>>
>>>> The error callstack is from JM's log, which happened in every cp.
>>>> Currently I don't have a success cp yet.
>>>>
>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月3日周五 上午3:50写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thanks for the details.
>>>>> However, I was not able to reproduce the issue. I used parallelism
>>>>> levels 4, file system backend and tried different timings for
>>>>> checkpointing, windowing and source.
>>>>> Do you encounter this problem deterministically, is it always 1st
>>>>> checkpoint?
>>>>> What checkpointing interval do you use?
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <un...@gmail.com> wrote:
>>>>>
>>>>>> Hi, this is our production code so I have to modify it a little bit,
>>>>>> such as variable name and function name. I think 3 classes I provide here
>>>>>> is enough.
>>>>>>
>>>>>> I try to join two streams, but I don't want to use the default join
>>>>>> function, because I want to send the joined log immediately and remove it
>>>>>> from window state immediately. And my window gap time is very long( 20
>>>>>> minutes), so it maybe evaluate it multiple times.
>>>>>>
>>>>>> class JoinFunction extends
>>>>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>>>>
>>>>>>   var ueState: ValueState[RawLog] = _
>>>>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>>>>   val invalidCounter = new LongCounter()
>>>>>>   val processCounter = new LongCounter()
>>>>>>   val sendToKafkaCounter = new LongCounter()
>>>>>>
>>>>>>   override def open(parameters: Configuration): Unit = {
>>>>>>     ueState = getRuntimeContext.getState(
>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>     )
>>>>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>>>>     getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>>>>>>     getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>>>>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
>>>>>>   }
>>>>>>
>>>>>>   override def process(key: String,
>>>>>>                        ctx: Context,
>>>>>>                        logs: Iterable[RawLog],
>>>>>>                        out: Collector[OutputLog]): Unit = {
>>>>>>     if (ueState.value() != null) {
>>>>>>       processCounter.add(1L)
>>>>>>       val bid = ueState.value()
>>>>>>       val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
>>>>>>       logs.foreach( log => {
>>>>>>         if (log.eventType == SHOW) {
>>>>>>           val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>>>>>>           sendToKafkaCounter.add(1L)
>>>>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
>>>>>>         }
>>>>>>       })
>>>>>>     } else {
>>>>>>       invalidCounter.add(1L)
>>>>>>     }
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>>>>
>>>>>>   override def onElement(log: RawLog,
>>>>>>                          timestamp: Long,
>>>>>>                          window: TimeWindow,
>>>>>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>     )
>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>
>>>>>>     if (!firstSeen.value()) {
>>>>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>>>>       firstSeen.update(true)
>>>>>>     }
>>>>>>     val eventType = log.eventType
>>>>>>     if (eventType == BID) {
>>>>>>       ueState.update(log)
>>>>>>       TriggerResult.CONTINUE
>>>>>>     } else {
>>>>>>       if (ueState.value() == null) {
>>>>>>         TriggerResult.CONTINUE
>>>>>>       } else {
>>>>>>         TriggerResult.FIRE
>>>>>>       }
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>>   override def onEventTime(timestamp: Long,
>>>>>>                            window: TimeWindow,
>>>>>>                            ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>>     if (timestamp == window.getEnd) {
>>>>>>       TriggerResult.PURGE
>>>>>>     }
>>>>>>     TriggerResult.CONTINUE
>>>>>>   }
>>>>>>
>>>>>>   override def onProcessingTime(timestamp: Long,
>>>>>>                                 window: TimeWindow,
>>>>>>                                 ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>>     TriggerResult.CONTINUE
>>>>>>   }
>>>>>>
>>>>>>   override def clear(window: TimeWindow,
>>>>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>>     )
>>>>>>     ueState.clear()
>>>>>>
>>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>>     firstSeen.clear()
>>>>>>
>>>>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>>>>
>>>>>>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>>>>>>                            size: Int,
>>>>>>                            window: TimeWindow,
>>>>>>                            evictorContext: Evictor.EvictorContext): Unit = {}
>>>>>>
>>>>>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>>>>>                            size: Int,
>>>>>>                            window: TimeWindow,
>>>>>>                            evictorContext: Evictor.EvictorContext): Unit = {
>>>>>>     val iter = elements.iterator()
>>>>>>     while (iter.hasNext) {
>>>>>>       iter.next()
>>>>>>       iter.remove()
>>>>>>     }
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四
>>>>>> 下午7:18写道:
>>>>>>
>>>>>>> Thanks for the clarification.
>>>>>>>
>>>>>>> Can you also share the code of other parts, particularly MyFunction?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Roman
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <un...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Rocksdb backend has the same problem
>>>>>>>>
>>>>>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四
>>>>>>>> 下午6:11写道:
>>>>>>>>
>>>>>>>>> Thanks for reporting this.
>>>>>>>>>
>>>>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>>>>> state entry.
>>>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>>>>>> further investigate it.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Roman
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <un...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>>>>>>> evictor. The state is stored to memory.
>>>>>>>>>>
>>>>>>>>>> input.setParallelism(processParallelism)
>>>>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>>>>         .keyBy(_.key)
>>>>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>>>>         .trigger(new MyTrigger)
>>>>>>>>>>         .evictor(new MyEvictor)
>>>>>>>>>>         .process(new
>>>>>>>>>> MyFunction).setParallelism(aggregateParallelism)
>>>>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>>>>         .name("kafka-record-sink")
>>>>>>>>>>
>>>>>>>>>> And the exception stack is here, could anyone help with this?
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>>>>> kafka-record-sink (2/5).
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>>>>> StreamTask.java:1100)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>>>>> ClassCastException:
>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122
>>>>>>>>>> )
>>>>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:
>>>>>>>>>> 47)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>>>>     ... 3 more
>>>>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>> CopyOnWriteStateMapSnapshot.writeState(
>>>>>>>>>> CopyOnWriteStateMapSnapshot.java:114)
>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>> AbstractStateTableSnapshot.java:121)
>>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>>>>>>> AsyncSnapshotCallable.java:75)
>>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>>>>     ... 5 more
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards
>>>>>>>>>>
>>>>>>>>>> Sili Liu
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards
>>>>>>>>
>>>>>>>> Sili Liu
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Posted by Congxian Qiu <qc...@gmail.com>.
Hi

First, Could you please try this problem still there if use flink 1.10 or
1.11?

It seems strange, from the error message, here is an error when trying to
convert a non-Window state(VoidNameSpace) to a Window State (serializer is
the serializer of Window state, but the state is non-Window state).
Could you please try to replace the MyFuction with a
reduce/aggregate/fold/apply()
function to see what happens? -- this wants to narrow down the problem.

Best,
Congxian


Si-li Liu <un...@gmail.com> 于2020年7月3日周五 下午6:44写道:

> Thanks for your help
>
> 1. I started the job from scratch, not a savepoint or externalized
> checkpoint
> 2. No job graph change
> 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 4. My Flink version is 1.9.1
>
> Khachatryan Roman <kh...@gmail.com> 于2020年7月3日周五 下午4:49写道:
>
>> I still wasn't able to reproduce the issue.
>>
>> Can you also clarify:
>> - Are you starting the job from a savepoint or externalized checkpoint?
>> - If yes, was the job graph changed?
>> - What StreamTimeCharacteristic is set, if any?
>> - What exact version of Flink do you use?
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <un...@gmail.com> wrote:
>>
>>> Hi, Thanks for your help.
>>>
>>> The checkpoint configuration is
>>>
>>> checkpoint.intervalMS=300000
>>> checkpoint.timeoutMS=300000
>>>
>>> The error callstack is from JM's log, which happened in every cp.
>>> Currently I don't have a success cp yet.
>>>
>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月3日周五 上午3:50写道:
>>>
>>>> Hi,
>>>>
>>>> Thanks for the details.
>>>> However, I was not able to reproduce the issue. I used parallelism
>>>> levels 4, file system backend and tried different timings for
>>>> checkpointing, windowing and source.
>>>> Do you encounter this problem deterministically, is it always 1st
>>>> checkpoint?
>>>> What checkpointing interval do you use?
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <un...@gmail.com> wrote:
>>>>
>>>>> Hi, this is our production code so I have to modify it a little bit,
>>>>> such as variable name and function name. I think 3 classes I provide here
>>>>> is enough.
>>>>>
>>>>> I try to join two streams, but I don't want to use the default join
>>>>> function, because I want to send the joined log immediately and remove it
>>>>> from window state immediately. And my window gap time is very long( 20
>>>>> minutes), so it maybe evaluate it multiple times.
>>>>>
>>>>> class JoinFunction extends
>>>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>>>
>>>>>   var ueState: ValueState[RawLog] = _
>>>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>>>   val invalidCounter = new LongCounter()
>>>>>   val processCounter = new LongCounter()
>>>>>   val sendToKafkaCounter = new LongCounter()
>>>>>
>>>>>   override def open(parameters: Configuration): Unit = {
>>>>>     ueState = getRuntimeContext.getState(
>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>     )
>>>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>>>     getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>>>>>     getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>>>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
>>>>>   }
>>>>>
>>>>>   override def process(key: String,
>>>>>                        ctx: Context,
>>>>>                        logs: Iterable[RawLog],
>>>>>                        out: Collector[OutputLog]): Unit = {
>>>>>     if (ueState.value() != null) {
>>>>>       processCounter.add(1L)
>>>>>       val bid = ueState.value()
>>>>>       val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
>>>>>       logs.foreach( log => {
>>>>>         if (log.eventType == SHOW) {
>>>>>           val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>>>>>           sendToKafkaCounter.add(1L)
>>>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
>>>>>         }
>>>>>       })
>>>>>     } else {
>>>>>       invalidCounter.add(1L)
>>>>>     }
>>>>>   }
>>>>> }
>>>>>
>>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>>>
>>>>>   override def onElement(log: RawLog,
>>>>>                          timestamp: Long,
>>>>>                          window: TimeWindow,
>>>>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>     )
>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>
>>>>>     if (!firstSeen.value()) {
>>>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>>>       firstSeen.update(true)
>>>>>     }
>>>>>     val eventType = log.eventType
>>>>>     if (eventType == BID) {
>>>>>       ueState.update(log)
>>>>>       TriggerResult.CONTINUE
>>>>>     } else {
>>>>>       if (ueState.value() == null) {
>>>>>         TriggerResult.CONTINUE
>>>>>       } else {
>>>>>         TriggerResult.FIRE
>>>>>       }
>>>>>     }
>>>>>   }
>>>>>
>>>>>   override def onEventTime(timestamp: Long,
>>>>>                            window: TimeWindow,
>>>>>                            ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>     if (timestamp == window.getEnd) {
>>>>>       TriggerResult.PURGE
>>>>>     }
>>>>>     TriggerResult.CONTINUE
>>>>>   }
>>>>>
>>>>>   override def onProcessingTime(timestamp: Long,
>>>>>                                 window: TimeWindow,
>>>>>                                 ctx: Trigger.TriggerContext): TriggerResult = {
>>>>>     TriggerResult.CONTINUE
>>>>>   }
>>>>>
>>>>>   override def clear(window: TimeWindow,
>>>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>>     )
>>>>>     ueState.clear()
>>>>>
>>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>>     firstSeen.clear()
>>>>>
>>>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>>>   }
>>>>> }
>>>>>
>>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>>>
>>>>>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>>>>>                            size: Int,
>>>>>                            window: TimeWindow,
>>>>>                            evictorContext: Evictor.EvictorContext): Unit = {}
>>>>>
>>>>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>>>>                            size: Int,
>>>>>                            window: TimeWindow,
>>>>>                            evictorContext: Evictor.EvictorContext): Unit = {
>>>>>     val iter = elements.iterator()
>>>>>     while (iter.hasNext) {
>>>>>       iter.next()
>>>>>       iter.remove()
>>>>>     }
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四 下午7:18写道:
>>>>>
>>>>>> Thanks for the clarification.
>>>>>>
>>>>>> Can you also share the code of other parts, particularly MyFunction?
>>>>>>
>>>>>> Regards,
>>>>>> Roman
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <un...@gmail.com> wrote:
>>>>>>
>>>>>>> Rocksdb backend has the same problem
>>>>>>>
>>>>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四
>>>>>>> 下午6:11写道:
>>>>>>>
>>>>>>>> Thanks for reporting this.
>>>>>>>>
>>>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>>>> state entry.
>>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>>>>> further investigate it.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Roman
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <un...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>>>>>> evictor. The state is stored to memory.
>>>>>>>>>
>>>>>>>>> input.setParallelism(processParallelism)
>>>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>>>         .keyBy(_.key)
>>>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>>>         .trigger(new MyTrigger)
>>>>>>>>>         .evictor(new MyEvictor)
>>>>>>>>>         .process(new
>>>>>>>>> MyFunction).setParallelism(aggregateParallelism)
>>>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>>>         .name("kafka-record-sink")
>>>>>>>>>
>>>>>>>>> And the exception stack is here, could anyone help with this?
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>>>> kafka-record-sink (2/5).
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>>>> StreamTask.java:1100)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>>>> ClassCastException:
>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47
>>>>>>>>> )
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>>>     ... 3 more
>>>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow
>>>>>>>>> cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>> CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot
>>>>>>>>> .java:114)
>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>> AbstractStateTableSnapshot.java:121)
>>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>>>>>> AsyncSnapshotCallable.java:75)
>>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>>>     ... 5 more
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best regards
>>>>>>>>>
>>>>>>>>> Sili Liu
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards
>>>>>>>
>>>>>>> Sili Liu
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards
>>>>>
>>>>> Sili Liu
>>>>>
>>>>
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Posted by Si-li Liu <un...@gmail.com>.
Thanks for your help

1. I started the job from scratch, not a savepoint or externalized
checkpoint
2. No job graph change
3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
4. My Flink version is 1.9.1

Khachatryan Roman <kh...@gmail.com> 于2020年7月3日周五 下午4:49写道:

> I still wasn't able to reproduce the issue.
>
> Can you also clarify:
> - Are you starting the job from a savepoint or externalized checkpoint?
> - If yes, was the job graph changed?
> - What StreamTimeCharacteristic is set, if any?
> - What exact version of Flink do you use?
>
> Regards,
> Roman
>
>
> On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <un...@gmail.com> wrote:
>
>> Hi, Thanks for your help.
>>
>> The checkpoint configuration is
>>
>> checkpoint.intervalMS=300000
>> checkpoint.timeoutMS=300000
>>
>> The error callstack is from JM's log, which happened in every cp.
>> Currently I don't have a success cp yet.
>>
>> Khachatryan Roman <kh...@gmail.com> 于2020年7月3日周五 上午3:50写道:
>>
>>> Hi,
>>>
>>> Thanks for the details.
>>> However, I was not able to reproduce the issue. I used parallelism
>>> levels 4, file system backend and tried different timings for
>>> checkpointing, windowing and source.
>>> Do you encounter this problem deterministically, is it always 1st
>>> checkpoint?
>>> What checkpointing interval do you use?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <un...@gmail.com> wrote:
>>>
>>>> Hi, this is our production code so I have to modify it a little bit,
>>>> such as variable name and function name. I think 3 classes I provide here
>>>> is enough.
>>>>
>>>> I try to join two streams, but I don't want to use the default join
>>>> function, because I want to send the joined log immediately and remove it
>>>> from window state immediately. And my window gap time is very long( 20
>>>> minutes), so it maybe evaluate it multiple times.
>>>>
>>>> class JoinFunction extends
>>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>>
>>>>   var ueState: ValueState[RawLog] = _
>>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>>   val invalidCounter = new LongCounter()
>>>>   val processCounter = new LongCounter()
>>>>   val sendToKafkaCounter = new LongCounter()
>>>>
>>>>   override def open(parameters: Configuration): Unit = {
>>>>     ueState = getRuntimeContext.getState(
>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>     )
>>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>>     getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>>>>     getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
>>>>   }
>>>>
>>>>   override def process(key: String,
>>>>                        ctx: Context,
>>>>                        logs: Iterable[RawLog],
>>>>                        out: Collector[OutputLog]): Unit = {
>>>>     if (ueState.value() != null) {
>>>>       processCounter.add(1L)
>>>>       val bid = ueState.value()
>>>>       val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
>>>>       logs.foreach( log => {
>>>>         if (log.eventType == SHOW) {
>>>>           val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>>>>           sendToKafkaCounter.add(1L)
>>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
>>>>         }
>>>>       })
>>>>     } else {
>>>>       invalidCounter.add(1L)
>>>>     }
>>>>   }
>>>> }
>>>>
>>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>>
>>>>   override def onElement(log: RawLog,
>>>>                          timestamp: Long,
>>>>                          window: TimeWindow,
>>>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>     )
>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>
>>>>     if (!firstSeen.value()) {
>>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>>       firstSeen.update(true)
>>>>     }
>>>>     val eventType = log.eventType
>>>>     if (eventType == BID) {
>>>>       ueState.update(log)
>>>>       TriggerResult.CONTINUE
>>>>     } else {
>>>>       if (ueState.value() == null) {
>>>>         TriggerResult.CONTINUE
>>>>       } else {
>>>>         TriggerResult.FIRE
>>>>       }
>>>>     }
>>>>   }
>>>>
>>>>   override def onEventTime(timestamp: Long,
>>>>                            window: TimeWindow,
>>>>                            ctx: Trigger.TriggerContext): TriggerResult = {
>>>>     if (timestamp == window.getEnd) {
>>>>       TriggerResult.PURGE
>>>>     }
>>>>     TriggerResult.CONTINUE
>>>>   }
>>>>
>>>>   override def onProcessingTime(timestamp: Long,
>>>>                                 window: TimeWindow,
>>>>                                 ctx: Trigger.TriggerContext): TriggerResult = {
>>>>     TriggerResult.CONTINUE
>>>>   }
>>>>
>>>>   override def clear(window: TimeWindow,
>>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>>     )
>>>>     ueState.clear()
>>>>
>>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>>     firstSeen.clear()
>>>>
>>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>>   }
>>>> }
>>>>
>>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>>
>>>>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>>>>                            size: Int,
>>>>                            window: TimeWindow,
>>>>                            evictorContext: Evictor.EvictorContext): Unit = {}
>>>>
>>>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>>>                            size: Int,
>>>>                            window: TimeWindow,
>>>>                            evictorContext: Evictor.EvictorContext): Unit = {
>>>>     val iter = elements.iterator()
>>>>     while (iter.hasNext) {
>>>>       iter.next()
>>>>       iter.remove()
>>>>     }
>>>>   }
>>>> }
>>>>
>>>>
>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四 下午7:18写道:
>>>>
>>>>> Thanks for the clarification.
>>>>>
>>>>> Can you also share the code of other parts, particularly MyFunction?
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <un...@gmail.com> wrote:
>>>>>
>>>>>> Rocksdb backend has the same problem
>>>>>>
>>>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四
>>>>>> 下午6:11写道:
>>>>>>
>>>>>>> Thanks for reporting this.
>>>>>>>
>>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>>> state entry.
>>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>>>> further investigate it.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Roman
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <un...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>>>>> evictor. The state is stored to memory.
>>>>>>>>
>>>>>>>> input.setParallelism(processParallelism)
>>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>>         .keyBy(_.key)
>>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>>         .trigger(new MyTrigger)
>>>>>>>>         .evictor(new MyEvictor)
>>>>>>>>         .process(new
>>>>>>>> MyFunction).setParallelism(aggregateParallelism)
>>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>>         .name("kafka-record-sink")
>>>>>>>>
>>>>>>>> And the exception stack is here, could anyone help with this?
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>>> kafka-record-sink (2/5).
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>>> StreamTask.java:1100)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>>> ClassCastException:
>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot
>>>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>>     ... 3 more
>>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot
>>>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>> CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot
>>>>>>>> .java:114)
>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>> AbstractStateTableSnapshot.java:121)
>>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>>>>> AsyncSnapshotCallable.java:75)
>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>>     ... 5 more
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards
>>>>>>>>
>>>>>>>> Sili Liu
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Posted by Khachatryan Roman <kh...@gmail.com>.
I still wasn't able to reproduce the issue.

Can you also clarify:
- Are you starting the job from a savepoint or externalized checkpoint?
- If yes, was the job graph changed?
- What StreamTimeCharacteristic is set, if any?
- What exact version of Flink do you use?

Regards,
Roman


On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <un...@gmail.com> wrote:

> Hi, Thanks for your help.
>
> The checkpoint configuration is
>
> checkpoint.intervalMS=300000
> checkpoint.timeoutMS=300000
>
> The error callstack is from JM's log, which happened in every cp.
> Currently I don't have a success cp yet.
>
> Khachatryan Roman <kh...@gmail.com> 于2020年7月3日周五 上午3:50写道:
>
>> Hi,
>>
>> Thanks for the details.
>> However, I was not able to reproduce the issue. I used parallelism levels
>> 4, file system backend and tried different timings for
>> checkpointing, windowing and source.
>> Do you encounter this problem deterministically, is it always 1st
>> checkpoint?
>> What checkpointing interval do you use?
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <un...@gmail.com> wrote:
>>
>>> Hi, this is our production code so I have to modify it a little bit,
>>> such as variable name and function name. I think 3 classes I provide here
>>> is enough.
>>>
>>> I try to join two streams, but I don't want to use the default join
>>> function, because I want to send the joined log immediately and remove it
>>> from window state immediately. And my window gap time is very long( 20
>>> minutes), so it maybe evaluate it multiple times.
>>>
>>> class JoinFunction extends
>>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>>
>>>   var ueState: ValueState[RawLog] = _
>>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>>   val invalidCounter = new LongCounter()
>>>   val processCounter = new LongCounter()
>>>   val sendToKafkaCounter = new LongCounter()
>>>
>>>   override def open(parameters: Configuration): Unit = {
>>>     ueState = getRuntimeContext.getState(
>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>     )
>>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>>     getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>>>     getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
>>>   }
>>>
>>>   override def process(key: String,
>>>                        ctx: Context,
>>>                        logs: Iterable[RawLog],
>>>                        out: Collector[OutputLog]): Unit = {
>>>     if (ueState.value() != null) {
>>>       processCounter.add(1L)
>>>       val bid = ueState.value()
>>>       val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
>>>       logs.foreach( log => {
>>>         if (log.eventType == SHOW) {
>>>           val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>>>           sendToKafkaCounter.add(1L)
>>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
>>>         }
>>>       })
>>>     } else {
>>>       invalidCounter.add(1L)
>>>     }
>>>   }
>>> }
>>>
>>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>>
>>>   override def onElement(log: RawLog,
>>>                          timestamp: Long,
>>>                          window: TimeWindow,
>>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>     )
>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>
>>>     if (!firstSeen.value()) {
>>>       ctx.registerEventTimeTimer(window.getEnd)
>>>       firstSeen.update(true)
>>>     }
>>>     val eventType = log.eventType
>>>     if (eventType == BID) {
>>>       ueState.update(log)
>>>       TriggerResult.CONTINUE
>>>     } else {
>>>       if (ueState.value() == null) {
>>>         TriggerResult.CONTINUE
>>>       } else {
>>>         TriggerResult.FIRE
>>>       }
>>>     }
>>>   }
>>>
>>>   override def onEventTime(timestamp: Long,
>>>                            window: TimeWindow,
>>>                            ctx: Trigger.TriggerContext): TriggerResult = {
>>>     if (timestamp == window.getEnd) {
>>>       TriggerResult.PURGE
>>>     }
>>>     TriggerResult.CONTINUE
>>>   }
>>>
>>>   override def onProcessingTime(timestamp: Long,
>>>                                 window: TimeWindow,
>>>                                 ctx: Trigger.TriggerContext): TriggerResult = {
>>>     TriggerResult.CONTINUE
>>>   }
>>>
>>>   override def clear(window: TimeWindow,
>>>                      ctx: Trigger.TriggerContext): Unit = {
>>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>>     )
>>>     ueState.clear()
>>>
>>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>>     firstSeen.clear()
>>>
>>>     ctx.deleteEventTimeTimer(window.getEnd)
>>>   }
>>> }
>>>
>>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>>
>>>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>>>                            size: Int,
>>>                            window: TimeWindow,
>>>                            evictorContext: Evictor.EvictorContext): Unit = {}
>>>
>>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>>                            size: Int,
>>>                            window: TimeWindow,
>>>                            evictorContext: Evictor.EvictorContext): Unit = {
>>>     val iter = elements.iterator()
>>>     while (iter.hasNext) {
>>>       iter.next()
>>>       iter.remove()
>>>     }
>>>   }
>>> }
>>>
>>>
>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四 下午7:18写道:
>>>
>>>> Thanks for the clarification.
>>>>
>>>> Can you also share the code of other parts, particularly MyFunction?
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <un...@gmail.com> wrote:
>>>>
>>>>> Rocksdb backend has the same problem
>>>>>
>>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四 下午6:11写道:
>>>>>
>>>>>> Thanks for reporting this.
>>>>>>
>>>>>> Looks like the window namespace was replaced by VoidNamespace in
>>>>>> state entry.
>>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>>> further investigate it.
>>>>>>
>>>>>> Regards,
>>>>>> Roman
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <un...@gmail.com> wrote:
>>>>>>
>>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>>>> evictor. The state is stored to memory.
>>>>>>>
>>>>>>> input.setParallelism(processParallelism)
>>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>>         .keyBy(_.key)
>>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>>         .trigger(new MyTrigger)
>>>>>>>         .evictor(new MyEvictor)
>>>>>>>         .process(new MyFunction).setParallelism(aggregateParallelism)
>>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>>         .name("kafka-record-sink")
>>>>>>>
>>>>>>> And the exception stack is here, could anyone help with this? Thanks!
>>>>>>>
>>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for
>>>>>>> operator Window(TumblingEventTimeWindows(1200000), JoinTrigger,
>>>>>>> JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink:
>>>>>>> kafka-record-sink (2/5).
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>>> StreamTask.java:1100)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>>> ClassCastException:
>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot
>>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>     ... 3 more
>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot
>>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>> CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot
>>>>>>> .java:114)
>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>> AbstractStateTableSnapshot.writeStateInKeyGroup(
>>>>>>> AbstractStateTableSnapshot.java:121)
>>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>>>> AsyncSnapshotCallable.java:75)
>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>     ... 5 more
>>>>>>>
>>>>>>> --
>>>>>>> Best regards
>>>>>>>
>>>>>>> Sili Liu
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards
>>>>>
>>>>> Sili Liu
>>>>>
>>>>
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Posted by Si-li Liu <un...@gmail.com>.
Hi, Thanks for your help.

The checkpoint configuration is

checkpoint.intervalMS=300000
checkpoint.timeoutMS=300000

The error callstack is from JM's log, which happened in every cp. Currently
I don't have a success cp yet.

Khachatryan Roman <kh...@gmail.com> 于2020年7月3日周五 上午3:50写道:

> Hi,
>
> Thanks for the details.
> However, I was not able to reproduce the issue. I used parallelism levels
> 4, file system backend and tried different timings for
> checkpointing, windowing and source.
> Do you encounter this problem deterministically, is it always 1st
> checkpoint?
> What checkpointing interval do you use?
>
> Regards,
> Roman
>
>
> On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <un...@gmail.com> wrote:
>
>> Hi, this is our production code so I have to modify it a little bit, such
>> as variable name and function name. I think 3 classes I provide here is
>> enough.
>>
>> I try to join two streams, but I don't want to use the default join
>> function, because I want to send the joined log immediately and remove it
>> from window state immediately. And my window gap time is very long( 20
>> minutes), so it maybe evaluate it multiple times.
>>
>> class JoinFunction extends
>>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>>
>>   var ueState: ValueState[RawLog] = _
>>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>>   val invalidCounter = new LongCounter()
>>   val processCounter = new LongCounter()
>>   val sendToKafkaCounter = new LongCounter()
>>
>>   override def open(parameters: Configuration): Unit = {
>>     ueState = getRuntimeContext.getState(
>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>     )
>>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>>     getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>>     getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>>     getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
>>   }
>>
>>   override def process(key: String,
>>                        ctx: Context,
>>                        logs: Iterable[RawLog],
>>                        out: Collector[OutputLog]): Unit = {
>>     if (ueState.value() != null) {
>>       processCounter.add(1L)
>>       val bid = ueState.value()
>>       val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
>>       logs.foreach( log => {
>>         if (log.eventType == SHOW) {
>>           val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>>           sendToKafkaCounter.add(1L)
>>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
>>         }
>>       })
>>     } else {
>>       invalidCounter.add(1L)
>>     }
>>   }
>> }
>>
>> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>>
>>   override def onElement(log: RawLog,
>>                          timestamp: Long,
>>                          window: TimeWindow,
>>                          ctx: Trigger.TriggerContext): TriggerResult = {
>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>     )
>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>
>>     if (!firstSeen.value()) {
>>       ctx.registerEventTimeTimer(window.getEnd)
>>       firstSeen.update(true)
>>     }
>>     val eventType = log.eventType
>>     if (eventType == BID) {
>>       ueState.update(log)
>>       TriggerResult.CONTINUE
>>     } else {
>>       if (ueState.value() == null) {
>>         TriggerResult.CONTINUE
>>       } else {
>>         TriggerResult.FIRE
>>       }
>>     }
>>   }
>>
>>   override def onEventTime(timestamp: Long,
>>                            window: TimeWindow,
>>                            ctx: Trigger.TriggerContext): TriggerResult = {
>>     if (timestamp == window.getEnd) {
>>       TriggerResult.PURGE
>>     }
>>     TriggerResult.CONTINUE
>>   }
>>
>>   override def onProcessingTime(timestamp: Long,
>>                                 window: TimeWindow,
>>                                 ctx: Trigger.TriggerContext): TriggerResult = {
>>     TriggerResult.CONTINUE
>>   }
>>
>>   override def clear(window: TimeWindow,
>>                      ctx: Trigger.TriggerContext): Unit = {
>>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>>     )
>>     ueState.clear()
>>
>>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>>     firstSeen.clear()
>>
>>     ctx.deleteEventTimeTimer(window.getEnd)
>>   }
>> }
>>
>> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>>
>>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>>                            size: Int,
>>                            window: TimeWindow,
>>                            evictorContext: Evictor.EvictorContext): Unit = {}
>>
>>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>>                            size: Int,
>>                            window: TimeWindow,
>>                            evictorContext: Evictor.EvictorContext): Unit = {
>>     val iter = elements.iterator()
>>     while (iter.hasNext) {
>>       iter.next()
>>       iter.remove()
>>     }
>>   }
>> }
>>
>>
>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四 下午7:18写道:
>>
>>> Thanks for the clarification.
>>>
>>> Can you also share the code of other parts, particularly MyFunction?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <un...@gmail.com> wrote:
>>>
>>>> Rocksdb backend has the same problem
>>>>
>>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四 下午6:11写道:
>>>>
>>>>> Thanks for reporting this.
>>>>>
>>>>> Looks like the window namespace was replaced by VoidNamespace in state
>>>>> entry.
>>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>>> further investigate it.
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <un...@gmail.com> wrote:
>>>>>
>>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>>> evictor. The state is stored to memory.
>>>>>>
>>>>>> input.setParallelism(processParallelism)
>>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>>         .keyBy(_.key)
>>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>>         .trigger(new MyTrigger)
>>>>>>         .evictor(new MyEvictor)
>>>>>>         .process(new MyFunction).setParallelism(aggregateParallelism)
>>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>>         .name("kafka-record-sink")
>>>>>>
>>>>>> And the exception stack is here, could anyone help with this? Thanks!
>>>>>>
>>>>>> java.lang.Exception: Could not materialize checkpoint 1 for operator
>>>>>> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor,
>>>>>> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>>>> StreamTask.java:1100)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>> ThreadPoolExecutor.java:624)
>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>>> ClassCastException: org.apache.flink.streaming.api.windowing.windows.
>>>>>> TimeWindow cannot be cast to org.apache.flink.runtime.state.
>>>>>> VoidNamespace
>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>     at org.apache.flink.streaming.api.operators.
>>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>     ... 3 more
>>>>>> Caused by: java.lang.ClassCastException:
>>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot
>>>>>> be cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>> CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot
>>>>>> .java:114)
>>>>>>     at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
>>>>>> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>>>>>>     at org.apache.flink.runtime.state.heap.
>>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>>> AsyncSnapshotCallable.java:75)
>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>     ... 5 more
>>>>>>
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Posted by Khachatryan Roman <kh...@gmail.com>.
Hi,

Thanks for the details.
However, I was not able to reproduce the issue. I used parallelism levels
4, file system backend and tried different timings for
checkpointing, windowing and source.
Do you encounter this problem deterministically, is it always 1st
checkpoint?
What checkpointing interval do you use?

Regards,
Roman


On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <un...@gmail.com> wrote:

> Hi, this is our production code so I have to modify it a little bit, such
> as variable name and function name. I think 3 classes I provide here is
> enough.
>
> I try to join two streams, but I don't want to use the default join
> function, because I want to send the joined log immediately and remove it
> from window state immediately. And my window gap time is very long( 20
> minutes), so it maybe evaluate it multiple times.
>
> class JoinFunction extends
>   ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{
>
>   var ueState: ValueState[RawLog] = _
>   @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
>   val invalidCounter = new LongCounter()
>   val processCounter = new LongCounter()
>   val sendToKafkaCounter = new LongCounter()
>
>   override def open(parameters: Configuration): Unit = {
>     ueState = getRuntimeContext.getState(
>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>     )
>     gZipThriftSerializer = new GZipThriftSerializer[MyType]()
>     getRuntimeContext.addAccumulator("processCounter", this.processCounter)
>     getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
>     getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
>   }
>
>   override def process(key: String,
>                        ctx: Context,
>                        logs: Iterable[RawLog],
>                        out: Collector[OutputLog]): Unit = {
>     if (ueState.value() != null) {
>       processCounter.add(1L)
>       val bid = ueState.value()
>       val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
>       logs.foreach( log => {
>         if (log.eventType == SHOW) {
>           val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
>           sendToKafkaCounter.add(1L)
>           out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
>         }
>       })
>     } else {
>       invalidCounter.add(1L)
>     }
>   }
> }
>
> class JoinTrigger extends Trigger[RawLog, TimeWindow] {
>
>   override def onElement(log: RawLog,
>                          timestamp: Long,
>                          window: TimeWindow,
>                          ctx: Trigger.TriggerContext): TriggerResult = {
>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>     )
>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>
>     if (!firstSeen.value()) {
>       ctx.registerEventTimeTimer(window.getEnd)
>       firstSeen.update(true)
>     }
>     val eventType = log.eventType
>     if (eventType == BID) {
>       ueState.update(log)
>       TriggerResult.CONTINUE
>     } else {
>       if (ueState.value() == null) {
>         TriggerResult.CONTINUE
>       } else {
>         TriggerResult.FIRE
>       }
>     }
>   }
>
>   override def onEventTime(timestamp: Long,
>                            window: TimeWindow,
>                            ctx: Trigger.TriggerContext): TriggerResult = {
>     if (timestamp == window.getEnd) {
>       TriggerResult.PURGE
>     }
>     TriggerResult.CONTINUE
>   }
>
>   override def onProcessingTime(timestamp: Long,
>                                 window: TimeWindow,
>                                 ctx: Trigger.TriggerContext): TriggerResult = {
>     TriggerResult.CONTINUE
>   }
>
>   override def clear(window: TimeWindow,
>                      ctx: Trigger.TriggerContext): Unit = {
>     val ueState: ValueState[RawLog] = ctx.getPartitionedState(
>       new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
>     )
>     ueState.clear()
>
>     val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
>       new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
>     firstSeen.clear()
>
>     ctx.deleteEventTimeTimer(window.getEnd)
>   }
> }
>
> class JoinEvictor extends Evictor[RawLog, TimeWindow] {
>
>   override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
>                            size: Int,
>                            window: TimeWindow,
>                            evictorContext: Evictor.EvictorContext): Unit = {}
>
>   override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
>                            size: Int,
>                            window: TimeWindow,
>                            evictorContext: Evictor.EvictorContext): Unit = {
>     val iter = elements.iterator()
>     while (iter.hasNext) {
>       iter.next()
>       iter.remove()
>     }
>   }
> }
>
>
> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四 下午7:18写道:
>
>> Thanks for the clarification.
>>
>> Can you also share the code of other parts, particularly MyFunction?
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <un...@gmail.com> wrote:
>>
>>> Rocksdb backend has the same problem
>>>
>>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四 下午6:11写道:
>>>
>>>> Thanks for reporting this.
>>>>
>>>> Looks like the window namespace was replaced by VoidNamespace in state
>>>> entry.
>>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>>> further investigate it.
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <un...@gmail.com> wrote:
>>>>
>>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>>> evictor. The state is stored to memory.
>>>>>
>>>>> input.setParallelism(processParallelism)
>>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>>         .keyBy(_.key)
>>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>>         .trigger(new MyTrigger)
>>>>>         .evictor(new MyEvictor)
>>>>>         .process(new MyFunction).setParallelism(aggregateParallelism)
>>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>>         .name("kafka-record-sink")
>>>>>
>>>>> And the exception stack is here, could anyone help with this? Thanks!
>>>>>
>>>>> java.lang.Exception: Could not materialize checkpoint 1 for operator
>>>>> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor,
>>>>> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask
>>>>> .java:1100)
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>> ThreadPoolExecutor.java:1149)
>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>> ThreadPoolExecutor.java:624)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>>> ClassCastException: org.apache.flink.streaming.api.windowing.windows.
>>>>> TimeWindow cannot be cast to org.apache.flink.runtime.state.
>>>>> VoidNamespace
>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>     at org.apache.flink.streaming.api.operators.
>>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>     ... 3 more
>>>>> Caused by: java.lang.ClassCastException:
>>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be
>>>>> cast to org.apache.flink.runtime.state.VoidNamespace
>>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot
>>>>> .writeState(CopyOnWriteStateMapSnapshot.java:114)
>>>>>     at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
>>>>> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>>>>>     at org.apache.flink.runtime.state.heap.
>>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>>> AsyncSnapshotCallable.java:75)
>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>     ... 5 more
>>>>>
>>>>> --
>>>>> Best regards
>>>>>
>>>>> Sili Liu
>>>>>
>>>>
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Posted by Si-li Liu <un...@gmail.com>.
Hi, this is our production code so I have to modify it a little bit, such
as variable name and function name. I think 3 classes I provide here is
enough.

I try to join two streams, but I don't want to use the default join
function, because I want to send the joined log immediately and remove it
from window state immediately. And my window gap time is very long( 20
minutes), so it maybe evaluate it multiple times.

class JoinFunction extends
  ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{

  var ueState: ValueState[RawLog] = _
  @transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
  val invalidCounter = new LongCounter()
  val processCounter = new LongCounter()
  val sendToKafkaCounter = new LongCounter()

  override def open(parameters: Configuration): Unit = {
    ueState = getRuntimeContext.getState(
      new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
    )
    gZipThriftSerializer = new GZipThriftSerializer[MyType]()
    getRuntimeContext.addAccumulator("processCounter", this.processCounter)
    getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
    getRuntimeContext.addAccumulator("sendToKafkaCounter",
this.sendToKafkaCounter)
  }

  override def process(key: String,
                       ctx: Context,
                       logs: Iterable[RawLog],
                       out: Collector[OutputLog]): Unit = {
    if (ueState.value() != null) {
      processCounter.add(1L)
      val bid = ueState.value()
      val bidLog =
gZipThriftSerializer.decompressAndDeserialize(bid.payload,
classOf[MyType])
      logs.foreach( log => {
        if (log.eventType == SHOW) {
          val showLog =
gZipThriftSerializer.decompressAndDeserialize(log.payload,
classOf[MyType])
          sendToKafkaCounter.add(1L)
          out.collect(new OutputLog(ThriftUtils.serialize(showLog),
Utils.getOutputTopic(showLog)))
        }
      })
    } else {
      invalidCounter.add(1L)
    }
  }
}

class JoinTrigger extends Trigger[RawLog, TimeWindow] {

  override def onElement(log: RawLog,
                         timestamp: Long,
                         window: TimeWindow,
                         ctx: Trigger.TriggerContext): TriggerResult = {
    val ueState: ValueState[RawLog] = ctx.getPartitionedState(
      new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
    )
    val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
      new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))

    if (!firstSeen.value()) {
      ctx.registerEventTimeTimer(window.getEnd)
      firstSeen.update(true)
    }
    val eventType = log.eventType
    if (eventType == BID) {
      ueState.update(log)
      TriggerResult.CONTINUE
    } else {
      if (ueState.value() == null) {
        TriggerResult.CONTINUE
      } else {
        TriggerResult.FIRE
      }
    }
  }

  override def onEventTime(timestamp: Long,
                           window: TimeWindow,
                           ctx: Trigger.TriggerContext): TriggerResult = {
    if (timestamp == window.getEnd) {
      TriggerResult.PURGE
    }
    TriggerResult.CONTINUE
  }

  override def onProcessingTime(timestamp: Long,
                                window: TimeWindow,
                                ctx: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }

  override def clear(window: TimeWindow,
                     ctx: Trigger.TriggerContext): Unit = {
    val ueState: ValueState[RawLog] = ctx.getPartitionedState(
      new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
    )
    ueState.clear()

    val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
      new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
    firstSeen.clear()

    ctx.deleteEventTimeTimer(window.getEnd)
  }
}

class JoinEvictor extends Evictor[RawLog, TimeWindow] {

  override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
                           size: Int,
                           window: TimeWindow,
                           evictorContext: Evictor.EvictorContext): Unit = {}

  override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
                           size: Int,
                           window: TimeWindow,
                           evictorContext: Evictor.EvictorContext): Unit = {
    val iter = elements.iterator()
    while (iter.hasNext) {
      iter.next()
      iter.remove()
    }
  }
}


Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四 下午7:18写道:

> Thanks for the clarification.
>
> Can you also share the code of other parts, particularly MyFunction?
>
> Regards,
> Roman
>
>
> On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <un...@gmail.com> wrote:
>
>> Rocksdb backend has the same problem
>>
>> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四 下午6:11写道:
>>
>>> Thanks for reporting this.
>>>
>>> Looks like the window namespace was replaced by VoidNamespace in state
>>> entry.
>>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>>> further investigate it.
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <un...@gmail.com> wrote:
>>>
>>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>>> evictor. The state is stored to memory.
>>>>
>>>> input.setParallelism(processParallelism)
>>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>>         .keyBy(_.key)
>>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>>         .trigger(new MyTrigger)
>>>>         .evictor(new MyEvictor)
>>>>         .process(new MyFunction).setParallelism(aggregateParallelism)
>>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>>         .name("kafka-record-sink")
>>>>
>>>> And the exception stack is here, could anyone help with this? Thanks!
>>>>
>>>> java.lang.Exception: Could not materialize checkpoint 1 for operator
>>>> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor,
>>>> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask
>>>> .java:1100)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>> ThreadPoolExecutor.java:1149)
>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>> ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>>> ClassCastException: org.apache.flink.streaming.api.windowing.windows.
>>>> TimeWindow cannot be cast to org.apache.flink.runtime.state.
>>>> VoidNamespace
>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>>     at org.apache.flink.streaming.api.operators.
>>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>     ... 3 more
>>>> Caused by: java.lang.ClassCastException:
>>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be
>>>> cast to org.apache.flink.runtime.state.VoidNamespace
>>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer
>>>> .serialize(VoidNamespaceSerializer.java:32)
>>>>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot
>>>> .writeState(CopyOnWriteStateMapSnapshot.java:114)
>>>>     at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
>>>> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>>>>     at org.apache.flink.runtime.state.heap.
>>>> CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(
>>>> CopyOnWriteStateTableSnapshot.java:37)
>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>>> AsyncSnapshotCallable.java:75)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>>     ... 5 more
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Posted by Khachatryan Roman <kh...@gmail.com>.
Thanks for the clarification.

Can you also share the code of other parts, particularly MyFunction?

Regards,
Roman


On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <un...@gmail.com> wrote:

> Rocksdb backend has the same problem
>
> Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四 下午6:11写道:
>
>> Thanks for reporting this.
>>
>> Looks like the window namespace was replaced by VoidNamespace in state
>> entry.
>> I've created https://issues.apache.org/jira/browse/FLINK-18464 to
>> further investigate it.
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <un...@gmail.com> wrote:
>>
>>> I'm using flink 1.9 on Mesos and I try to use my own trigger and
>>> evictor. The state is stored to memory.
>>>
>>> input.setParallelism(processParallelism)
>>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>>         .keyBy(_.key)
>>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>>         .trigger(new MyTrigger)
>>>         .evictor(new MyEvictor)
>>>         .process(new MyFunction).setParallelism(aggregateParallelism)
>>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>>         .name("kafka-record-sink")
>>>
>>> And the exception stack is here, could anyone help with this? Thanks!
>>>
>>> java.lang.Exception: Could not materialize checkpoint 1 for operator
>>> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor,
>>> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask
>>> .java:1100)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> ThreadPoolExecutor.java:1149)
>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:624)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>>> ClassCastException: org.apache.flink.streaming.api.windowing.windows.
>>> TimeWindow cannot be cast to org.apache.flink.runtime.state.
>>> VoidNamespace
>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>>     at org.apache.flink.streaming.api.operators.
>>> OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>     ... 3 more
>>> Caused by: java.lang.ClassCastException:
>>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be
>>> cast to org.apache.flink.runtime.state.VoidNamespace
>>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(
>>> VoidNamespaceSerializer.java:32)
>>>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot
>>> .writeState(CopyOnWriteStateMapSnapshot.java:114)
>>>     at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
>>> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>>>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot
>>> .writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>> .callInternal(HeapSnapshotStrategy.java:191)
>>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>>> .callInternal(HeapSnapshotStrategy.java:158)
>>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>>> AsyncSnapshotCallable.java:75)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>     at org.apache.flink.runtime.concurrent.FutureUtils
>>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>>     ... 5 more
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Posted by Si-li Liu <un...@gmail.com>.
Rocksdb backend has the same problem

Khachatryan Roman <kh...@gmail.com> 于2020年7月2日周四 下午6:11写道:

> Thanks for reporting this.
>
> Looks like the window namespace was replaced by VoidNamespace in state
> entry.
> I've created https://issues.apache.org/jira/browse/FLINK-18464 to further
> investigate it.
>
> Regards,
> Roman
>
>
> On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <un...@gmail.com> wrote:
>
>> I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor.
>> The state is stored to memory.
>>
>> input.setParallelism(processParallelism)
>>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>>         .keyBy(_.key)
>>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>>         .trigger(new MyTrigger)
>>         .evictor(new MyEvictor)
>>         .process(new MyFunction).setParallelism(aggregateParallelism)
>>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>>         .name("kafka-record-sink")
>>
>> And the exception stack is here, could anyone help with this? Thanks!
>>
>> java.lang.Exception: Could not materialize checkpoint 1 for operator
>> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor,
>> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>>     at org.apache.flink.streaming.runtime.tasks.
>> StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask
>> .java:1100)
>>     at org.apache.flink.streaming.runtime.tasks.
>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1149)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:624)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.util.concurrent.ExecutionException: java.lang.
>> ClassCastException: org.apache.flink.streaming.api.windowing.windows.
>> TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>     at org.apache.flink.runtime.concurrent.FutureUtils
>> .runIfNotDoneAndGet(FutureUtils.java:450)
>>     at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer
>> .<init>(OperatorSnapshotFinalizer.java:47)
>>     at org.apache.flink.streaming.runtime.tasks.
>> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>     ... 3 more
>> Caused by: java.lang.ClassCastException:
>> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be
>> cast to org.apache.flink.runtime.state.VoidNamespace
>>     at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(
>> VoidNamespaceSerializer.java:32)
>>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot
>> .writeState(CopyOnWriteStateMapSnapshot.java:114)
>>     at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
>> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot
>> .writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>> .callInternal(HeapSnapshotStrategy.java:191)
>>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
>> .callInternal(HeapSnapshotStrategy.java:158)
>>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
>> AsyncSnapshotCallable.java:75)
>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>     at org.apache.flink.runtime.concurrent.FutureUtils
>> .runIfNotDoneAndGet(FutureUtils.java:447)
>>     ... 5 more
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>

-- 
Best regards

Sili Liu

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Posted by Khachatryan Roman <kh...@gmail.com>.
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state
entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further
investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <un...@gmail.com> wrote:

> I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor.
> The state is stored to memory.
>
> input.setParallelism(processParallelism)
>         .assignTimestampsAndWatermarks(new UETimeAssigner)
>         .keyBy(_.key)
>         .window(TumblingEventTimeWindows.of(Time.minutes(20)))
>         .trigger(new MyTrigger)
>         .evictor(new MyEvictor)
>         .process(new MyFunction).setParallelism(aggregateParallelism)
>         .addSink(kafkaSink).setParallelism(sinkParallelism)
>         .name("kafka-record-sink")
>
> And the exception stack is here, could anyone help with this? Thanks!
>
> java.lang.Exception: Could not materialize checkpoint 1 for operator
> Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor,
> ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask
> .java:1100)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: java.lang.
> ClassCastException: org.apache.flink.streaming.api.windowing.windows.
> TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(
> FutureUtils.java:450)
>     at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer
> .<init>(OperatorSnapshotFinalizer.java:47)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>     ... 3 more
> Caused by: java.lang.ClassCastException:
> org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast
> to org.apache.flink.runtime.state.VoidNamespace
>     at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(
> VoidNamespaceSerializer.java:32)
>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot
> .writeState(CopyOnWriteStateMapSnapshot.java:114)
>     at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot
> .writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot
> .writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
> .callInternal(HeapSnapshotStrategy.java:191)
>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1
> .callInternal(HeapSnapshotStrategy.java:158)
>     at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(
> AsyncSnapshotCallable.java:75)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(
> FutureUtils.java:447)
>     ... 5 more
>
> --
> Best regards
>
> Sili Liu
>