You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Averell <lv...@gmail.com> on 2019/05/05 08:43:33 UTC

IllegalArgumentException with CEP & reinterpretAsKeyedStream

Hi everyone,

I have a big stream A, filtered by flags from a small stream B, then unioned
with another stream C to become the input for my CEP.
As the three streams A, B, C are all keyed, I expected that the output
stream resulting from connecting/unioning them would also be keyed, thus I
used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I
got the error /IllegalArgumentException/ (full stack-trace below).
If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
use /keyBy/ manually), then there's no such exception.

I don't know how to debug this error, and not sure whether I should use
keyed streams with CEP? 

Thanks and best regards,
Averell


My code:
/	val cepInput = streamA.keyBy(r => (r.id1, r.id2))
			.connect(streamB.keyBy(r => (r.id1, r.id2)))
			.flatMap(new MyCandidateFilterFunction())
			.union(streamC.keyBy(r => (r.id1, r.id2)))

	val cepOutput =
		MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1,
r.id2)),
			counter1, counter2,
			threshold1, threshold2)

	object MyCEP {
		def apply(input: KeyedStream[Event, _],
				  longPeriod: Int,
				  threshold: Int,
				  shortPeriod: Int): DataStream[Event] = {

			val patternLineIsUp = Pattern.begin[Event]("period1")
					.where((value: event, ctx: CepContext[Event]) => accSum(_.counter,
Seq("period1"), value, ctx) < threshold)
					.times(longPeriod - shortPeriod).consecutive()
	        		.next("period2")
					.where((value: Event, ctx: CepContext[Event]) =>
						accSum(_.counter, Seq("period1", "period2"), value, ctx) < threshold
&& value.status == "up")
					.times(shortPeriod).consecutive()

			collectPattern(input, patternLineIsUp)
		}

		private def accSum(f: Event => Long, keys: Seq[String], currentEvent:
Event, ctx: CepContext[Event]): Long = {
			keys.map(key => ctx.getEventsForPattern(key).map(f).sum).sum +
f(currentEvent)
		}

		private def collectPattern(inputStream: KeyedStream[Event, _], pattern:
Pattern[Event, Event]): DataStream[Event] =
			CEP.pattern(inputStream, pattern)
					.process((map: util.Map[String, util.List[Event]], ctx:
PatternProcessFunction.Context, collector: Collector[Event]) => {
						val records = map.get("period2")
						collector.collect(records.get(records.size() - 1))
					})
	}/

The exception:
/Exception in thread "main" 12:43:13,103 INFO 
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka
RPC service.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
	at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
	at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
	at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
	at com.mycompany.StreamingJob$.main(Streaming.scala:440)
	at com.mycompany.StreamingJob.main(Streaming.scala)
Caused by: java.lang.IllegalArgumentException
	at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
	at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
	at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
	at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
	at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
	at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
	at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215)
	at
org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
	at
org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
	at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Posted by Le-Van Huyen <lv...@gmail.com>.
Hello Yun,

Bellow is the one when there is no "map". Output of the top-left box is my
Stream A. The line from the bottom-left box to the top-right box is my
Stream B. The "co-flatmap" is a filter.
From the bottom-left to the right box is my stream C
[image: Screen Shot 2019-05-07 at 10.06.15.png]

And here below is the version with "map"
[image: Screen Shot 2019-05-07 at 10.04.56.png]

Regards,
Averell


On Mon, May 6, 2019 at 9:30 PM Yun Tang <my...@live.com> wrote:

> Hi Averell
>
> Would you please share the Flink web graph UI to illustrate the change
> after you append a *map* operator?
>
> Best
> Yun Tang
> ------------------------------
> *From:* Le-Van Huyen <lv...@gmail.com>
> *Sent:* Monday, May 6, 2019 11:15
> *To:* Yun Tang
> *Cc:* user@flink.apache.org
> *Subject:* Re: IllegalArgumentException with CEP &
> reinterpretAsKeyedStream
>
> Thank you Yun.
>
> I haven't tried to follow your guide to check (would take some time for me
> to follow on how to do).
> However, I could now confirm that the "*union"* is the culprit. In my
> Flink Console GUI, I can see that the link from StreamC to CEP via "union" is
> a FORWARD link, not a HASH one, which means that having "keyBy" right
> before the "union" has no effect at all. If I put a placebo "map" between
> "keyBy" on streamC and "union" then the problem is solved (*.union(streamC.keyBy(r
> => (r.id1, r.id2)).map(r => r))*)
>
> I don't know why "union" is behaving like that though. Could not find that
> mentioned in any document.
>
> Thanks a lot for your help.
>
> Regards,
> Averell
>
>
> On Sun, May 5, 2019 at 11:22 PM Yun Tang <my...@live.com> wrote:
>
> Hi Averell
>
> I think this is because after 'union', the input stream actually did not
> follow the rule that key must be pre-partitioned in *EXACTLY* the same
> way Flink’s keyBy would partition the data [1]. An easy way to verify this
> is refer to [2] to filter whether different sub-task of union stream
> contains exactly what down stream task conatains.
>
> Best
> Yun Tang
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> [2]
> https://github.com/apache/flink/blob/4e505c67542a45c82c763c12099bdfc621c9e476/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java#L223
>
> ------------------------------
> *From:* Averell <lv...@gmail.com>
> *Sent:* Sunday, May 5, 2019 16:43
> *To:* user@flink.apache.org
> *Subject:* IllegalArgumentException with CEP & reinterpretAsKeyedStream
>
> Hi everyone,
>
> I have a big stream A, filtered by flags from a small stream B, then
> unioned
> with another stream C to become the input for my CEP.
> As the three streams A, B, C are all keyed, I expected that the output
> stream resulting from connecting/unioning them would also be keyed, thus I
> used /reinterpretAsKeyedStream/ before putting it into CEP. And with this,
> I
> got the error /IllegalArgumentException/ (full stack-trace below).
> If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
> use /keyBy/ manually), then there's no such exception.
>
> I don't know how to debug this error, and not sure whether I should use
> keyed streams with CEP?
>
> Thanks and best regards,
> Averell
>
>
> My code:
> /       val cepInput = streamA.keyBy(r => (r.id1, r.id2))
>                         .connect(streamB.keyBy(r => (r.id1, r.id2)))
>                         .flatMap(new MyCandidateFilterFunction())
>                         .union(streamC.keyBy(r => (r.id1, r.id2)))
>
>         val cepOutput =
>                 MyCEP(new
> DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1,
> r.id2)),
>                         counter1, counter2,
>                         threshold1, threshold2)
>
>         object MyCEP {
>                 def apply(input: KeyedStream[Event, _],
>                                   longPeriod: Int,
>                                   threshold: Int,
>                                   shortPeriod: Int): DataStream[Event] = {
>
>                         val patternLineIsUp =
> Pattern.begin[Event]("period1")
>                                         .where((value: event, ctx:
> CepContext[Event]) => accSum(_.counter,
> Seq("period1"), value, ctx) < threshold)
>                                         .times(longPeriod -
> shortPeriod).consecutive()
>                           .next("period2")
>                                         .where((value: Event, ctx:
> CepContext[Event]) =>
>                                                 accSum(_.counter,
> Seq("period1", "period2"), value, ctx) < threshold
> && value.status == "up")
>                                         .times(shortPeriod).consecutive()
>
>                         collectPattern(input, patternLineIsUp)
>                 }
>
>                 private def accSum(f: Event => Long, keys: Seq[String],
> currentEvent:
> Event, ctx: CepContext[Event]): Long = {
>                         keys.map(key =>
> ctx.getEventsForPattern(key).map(f).sum).sum +
> f(currentEvent)
>                 }
>
>                 private def collectPattern(inputStream: KeyedStream[Event,
> _], pattern:
> Pattern[Event, Event]): DataStream[Event] =
>                         CEP.pattern(inputStream, pattern)
>                                         .process((map: util.Map[String,
> util.List[Event]], ctx:
> PatternProcessFunction.Context, collector: Collector[Event]) => {
>                                                 val records =
> map.get("period2")
>
> collector.collect(records.get(records.size() - 1))
>                                         })
>         }/
>
> The exception:
> /Exception in thread "main" 12:43:13,103 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped
> Akka
> RPC service.
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
>         at
>
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>         at
>
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>         at
>
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>         at
>
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>         at com.mycompany.StreamingJob$.main(Streaming.scala:440)
>         at com.mycompany.StreamingJob.main(Streaming.scala)
> Caused by: java.lang.IllegalArgumentException
>         at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>         at
>
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
>         at
>
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
>         at
>
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
>         at
>
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>         at
>
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
>         at
>
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215)
>         at
>
> org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
>         at
>
> org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
>         at
>
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> /
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Posted by Yun Tang <my...@live.com>.
Hi Averell

Would you please share the Flink web graph UI to illustrate the change after you append a map operator?

Best
Yun Tang
________________________________
From: Le-Van Huyen <lv...@gmail.com>
Sent: Monday, May 6, 2019 11:15
To: Yun Tang
Cc: user@flink.apache.org
Subject: Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Thank you Yun.

I haven't tried to follow your guide to check (would take some time for me to follow on how to do).
However, I could now confirm that the "union" is the culprit. In my Flink Console GUI, I can see that the link from StreamC to CEP via "union" is a FORWARD link, not a HASH one, which means that having "keyBy" right before the "union" has no effect at all. If I put a placebo "map" between "keyBy" on streamC and "union" then the problem is solved (.union(streamC.keyBy(r => (r.id1, r.id2)).map(r => r)))

I don't know why "union" is behaving like that though. Could not find that mentioned in any document.

Thanks a lot for your help.

Regards,
Averell


On Sun, May 5, 2019 at 11:22 PM Yun Tang <my...@live.com>> wrote:
Hi Averell

I think this is because after 'union', the input stream actually did not follow the rule that key must be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data [1]. An easy way to verify this is refer to [2] to filter whether different sub-task of union stream contains exactly what down stream task conatains.

Best
Yun Tang


[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
[2] https://github.com/apache/flink/blob/4e505c67542a45c82c763c12099bdfc621c9e476/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java#L223

________________________________
From: Averell <lv...@gmail.com>>
Sent: Sunday, May 5, 2019 16:43
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Hi everyone,

I have a big stream A, filtered by flags from a small stream B, then unioned
with another stream C to become the input for my CEP.
As the three streams A, B, C are all keyed, I expected that the output
stream resulting from connecting/unioning them would also be keyed, thus I
used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I
got the error /IllegalArgumentException/ (full stack-trace below).
If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
use /keyBy/ manually), then there's no such exception.

I don't know how to debug this error, and not sure whether I should use
keyed streams with CEP?

Thanks and best regards,
Averell


My code:
/       val cepInput = streamA.keyBy(r => (r.id1, r.id2))
                        .connect(streamB.keyBy(r => (r.id1, r.id2)))
                        .flatMap(new MyCandidateFilterFunction())
                        .union(streamC.keyBy(r => (r.id1, r.id2)))

        val cepOutput =
                MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1,
r.id2)),
                        counter1, counter2,
                        threshold1, threshold2)

        object MyCEP {
                def apply(input: KeyedStream[Event, _],
                                  longPeriod: Int,
                                  threshold: Int,
                                  shortPeriod: Int): DataStream[Event] = {

                        val patternLineIsUp = Pattern.begin[Event]("period1")
                                        .where((value: event, ctx: CepContext[Event]) => accSum(_.counter,
Seq("period1"), value, ctx) < threshold)
                                        .times(longPeriod - shortPeriod).consecutive()
                          .next("period2")
                                        .where((value: Event, ctx: CepContext[Event]) =>
                                                accSum(_.counter, Seq("period1", "period2"), value, ctx) < threshold
&& value.status == "up")
                                        .times(shortPeriod).consecutive()

                        collectPattern(input, patternLineIsUp)
                }

                private def accSum(f: Event => Long, keys: Seq[String], currentEvent:
Event, ctx: CepContext[Event]): Long = {
                        keys.map(key => ctx.getEventsForPattern(key).map(f).sum).sum +
f(currentEvent)
                }

                private def collectPattern(inputStream: KeyedStream[Event, _], pattern:
Pattern[Event, Event]): DataStream[Event] =
                        CEP.pattern(inputStream, pattern)
                                        .process((map: util.Map[String, util.List[Event]], ctx:
PatternProcessFunction.Context, collector: Collector[Event]) => {
                                                val records = map.get("period2")
                                                collector.collect(records.get(records.size() - 1))
                                        })
        }/

The exception:
/Exception in thread "main" 12:43:13,103 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka
RPC service.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
        at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
        at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
        at com.mycompany.StreamingJob$.main(Streaming.scala:440)
        at com.mycompany.StreamingJob.main(Streaming.scala)
Caused by: java.lang.IllegalArgumentException
        at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
        at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215)
        at
org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
        at
org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Posted by Le-Van Huyen <lv...@gmail.com>.
Thank you Yun.

I haven't tried to follow your guide to check (would take some time for me
to follow on how to do).
However, I could now confirm that the "*union"* is the culprit. In my Flink
Console GUI, I can see that the link from StreamC to CEP via "union" is a
FORWARD link, not a HASH one, which means that having "keyBy" right before
the "union" has no effect at all. If I put a placebo "map" between "keyBy"
on streamC and "union" then the problem is solved (*.union(streamC.keyBy(r
=> (r.id1, r.id2)).map(r => r))*)

I don't know why "union" is behaving like that though. Could not find that
mentioned in any document.

Thanks a lot for your help.

Regards,
Averell


On Sun, May 5, 2019 at 11:22 PM Yun Tang <my...@live.com> wrote:

> Hi Averell
>
> I think this is because after 'union', the input stream actually did not
> follow the rule that key must be pre-partitioned in *EXACTLY* the same
> way Flink’s keyBy would partition the data [1]. An easy way to verify this
> is refer to [2] to filter whether different sub-task of union stream
> contains exactly what down stream task conatains.
>
> Best
> Yun Tang
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> [2]
> https://github.com/apache/flink/blob/4e505c67542a45c82c763c12099bdfc621c9e476/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java#L223
>
> ------------------------------
> *From:* Averell <lv...@gmail.com>
> *Sent:* Sunday, May 5, 2019 16:43
> *To:* user@flink.apache.org
> *Subject:* IllegalArgumentException with CEP & reinterpretAsKeyedStream
>
> Hi everyone,
>
> I have a big stream A, filtered by flags from a small stream B, then
> unioned
> with another stream C to become the input for my CEP.
> As the three streams A, B, C are all keyed, I expected that the output
> stream resulting from connecting/unioning them would also be keyed, thus I
> used /reinterpretAsKeyedStream/ before putting it into CEP. And with this,
> I
> got the error /IllegalArgumentException/ (full stack-trace below).
> If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
> use /keyBy/ manually), then there's no such exception.
>
> I don't know how to debug this error, and not sure whether I should use
> keyed streams with CEP?
>
> Thanks and best regards,
> Averell
>
>
> My code:
> /       val cepInput = streamA.keyBy(r => (r.id1, r.id2))
>                         .connect(streamB.keyBy(r => (r.id1, r.id2)))
>                         .flatMap(new MyCandidateFilterFunction())
>                         .union(streamC.keyBy(r => (r.id1, r.id2)))
>
>         val cepOutput =
>                 MyCEP(new
> DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1,
> r.id2)),
>                         counter1, counter2,
>                         threshold1, threshold2)
>
>         object MyCEP {
>                 def apply(input: KeyedStream[Event, _],
>                                   longPeriod: Int,
>                                   threshold: Int,
>                                   shortPeriod: Int): DataStream[Event] = {
>
>                         val patternLineIsUp =
> Pattern.begin[Event]("period1")
>                                         .where((value: event, ctx:
> CepContext[Event]) => accSum(_.counter,
> Seq("period1"), value, ctx) < threshold)
>                                         .times(longPeriod -
> shortPeriod).consecutive()
>                           .next("period2")
>                                         .where((value: Event, ctx:
> CepContext[Event]) =>
>                                                 accSum(_.counter,
> Seq("period1", "period2"), value, ctx) < threshold
> && value.status == "up")
>                                         .times(shortPeriod).consecutive()
>
>                         collectPattern(input, patternLineIsUp)
>                 }
>
>                 private def accSum(f: Event => Long, keys: Seq[String],
> currentEvent:
> Event, ctx: CepContext[Event]): Long = {
>                         keys.map(key =>
> ctx.getEventsForPattern(key).map(f).sum).sum +
> f(currentEvent)
>                 }
>
>                 private def collectPattern(inputStream: KeyedStream[Event,
> _], pattern:
> Pattern[Event, Event]): DataStream[Event] =
>                         CEP.pattern(inputStream, pattern)
>                                         .process((map: util.Map[String,
> util.List[Event]], ctx:
> PatternProcessFunction.Context, collector: Collector[Event]) => {
>                                                 val records =
> map.get("period2")
>
> collector.collect(records.get(records.size() - 1))
>                                         })
>         }/
>
> The exception:
> /Exception in thread "main" 12:43:13,103 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped
> Akka
> RPC service.
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
>         at
>
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>         at
>
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>         at
>
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>         at
>
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>         at com.mycompany.StreamingJob$.main(Streaming.scala:440)
>         at com.mycompany.StreamingJob.main(Streaming.scala)
> Caused by: java.lang.IllegalArgumentException
>         at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>         at
>
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
>         at
>
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
>         at
>
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
>         at
>
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>         at
>
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
>         at
>
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215)
>         at
>
> org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
>         at
>
> org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
>         at
>
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> /
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Posted by Yun Tang <my...@live.com>.
Hi Averell

I think this is because after 'union', the input stream actually did not follow the rule that key must be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data [1]. An easy way to verify this is refer to [2] to filter whether different sub-task of union stream contains exactly what down stream task conatains.

Best
Yun Tang


[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
[2] https://github.com/apache/flink/blob/4e505c67542a45c82c763c12099bdfc621c9e476/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java#L223

________________________________
From: Averell <lv...@gmail.com>
Sent: Sunday, May 5, 2019 16:43
To: user@flink.apache.org
Subject: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Hi everyone,

I have a big stream A, filtered by flags from a small stream B, then unioned
with another stream C to become the input for my CEP.
As the three streams A, B, C are all keyed, I expected that the output
stream resulting from connecting/unioning them would also be keyed, thus I
used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I
got the error /IllegalArgumentException/ (full stack-trace below).
If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
use /keyBy/ manually), then there's no such exception.

I don't know how to debug this error, and not sure whether I should use
keyed streams with CEP?

Thanks and best regards,
Averell


My code:
/       val cepInput = streamA.keyBy(r => (r.id1, r.id2))
                        .connect(streamB.keyBy(r => (r.id1, r.id2)))
                        .flatMap(new MyCandidateFilterFunction())
                        .union(streamC.keyBy(r => (r.id1, r.id2)))

        val cepOutput =
                MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r => (r.id1,
r.id2)),
                        counter1, counter2,
                        threshold1, threshold2)

        object MyCEP {
                def apply(input: KeyedStream[Event, _],
                                  longPeriod: Int,
                                  threshold: Int,
                                  shortPeriod: Int): DataStream[Event] = {

                        val patternLineIsUp = Pattern.begin[Event]("period1")
                                        .where((value: event, ctx: CepContext[Event]) => accSum(_.counter,
Seq("period1"), value, ctx) < threshold)
                                        .times(longPeriod - shortPeriod).consecutive()
                          .next("period2")
                                        .where((value: Event, ctx: CepContext[Event]) =>
                                                accSum(_.counter, Seq("period1", "period2"), value, ctx) < threshold
&& value.status == "up")
                                        .times(shortPeriod).consecutive()

                        collectPattern(input, patternLineIsUp)
                }

                private def accSum(f: Event => Long, keys: Seq[String], currentEvent:
Event, ctx: CepContext[Event]): Long = {
                        keys.map(key => ctx.getEventsForPattern(key).map(f).sum).sum +
f(currentEvent)
                }

                private def collectPattern(inputStream: KeyedStream[Event, _], pattern:
Pattern[Event, Event]): DataStream[Event] =
                        CEP.pattern(inputStream, pattern)
                                        .process((map: util.Map[String, util.List[Event]], ctx:
PatternProcessFunction.Context, collector: Collector[Event]) => {
                                                val records = map.get("period2")
                                                collector.collect(records.get(records.size() - 1))
                                        })
        }/

The exception:
/Exception in thread "main" 12:43:13,103 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka
RPC service.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
        at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
        at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
        at com.mycompany.StreamingJob$.main(Streaming.scala:440)
        at com.mycompany.StreamingJob.main(Streaming.scala)
Caused by: java.lang.IllegalArgumentException
        at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
        at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
        at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:215)
        at
org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
        at
org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/