You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/01/11 08:21:00 UTC

[jira] [Commented] (FLINK-8405) Keyed State in broadcasted data steam.

    [ https://issues.apache.org/jira/browse/FLINK-8405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321848#comment-16321848 ] 

Fabian Hueske commented on FLINK-8405:
--------------------------------------

Connecting a keyed stream and a broadcasted stream is not supported, yet. See FLINK-3659
The reason why this is not supported is that the broadcasted input cannot access keyed state and the keyed input must not have write access to state from the broadcasted side. 
FLINK-3659 is targeted for the next release (Flink 1.5.0).

The book only describes keyed-keyed and forward-broadcast connection. However, it should have pointed out that keyed-broadcast is not supported at the moment.

[~mbarak] If you agree, would you close this issue as a duplicate of FLINK-3659?
Thanks.

> Keyed State in broadcasted data steam. 
> ---------------------------------------
>
>                 Key: FLINK-8405
>                 URL: https://issues.apache.org/jira/browse/FLINK-8405
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Marek Barak
>
> Hi guys,
> I am trying to join 2 streams. Where the second stream is an codelist used by the first stream for enrichment. I followed the guide described here:
> https://www.safaribooksonline.com/library/view/stream-processing-with/9781491974285/ch04.html
> With the distinction that instead of having an local HashMap, i used MapState. This part is actually important since i want my state properly checkpointed in cases of a failure. I managed to reproduce the issue with the following code: 
> {code}
> import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
> import org.apache.flink.util.Collector
> import org.apache.flink.streaming.api.scala._
> import org.junit.{Test, Assert }
> class SimpleTest extends StreamingMultipleProgramsTestBase {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   case object StateMap extends RichCoFlatMapFunction[String, (String, Int), Int] {
>     var codeList: MapState[String,Int] = _
>     override def open(parameters: Configuration): Unit = {
>       codeList = getRuntimeContext.getMapState(
>         new MapStateDescriptor[String,Int]("test", classOf[String], classOf[Int])
>       )
>     }
>     override def flatMap1(value: String, out: Collector[Int]): Unit = {
>       val res = if(codeList.contains(value)) codeList.get(value) else 0
>       out.collect(res)
>     }
>     override def flatMap2(value: (String, Int), out: Collector[Int]): Unit = {
>       codeList.put(value._1, value._2)
>       out.close()
>     }
>   }
>   @Test
>   def job() = {
>     val inputStream = env.fromCollection(List("Some", "Some2", "Some3"))
>     val dictStream = env.fromCollection(List("Some" -> 1, "Some2" -> 2, "Some3" -> 3))
>     inputStream
>       .connect(dictStream.broadcast)
>       .flatMap(StateMap)
>     env.execute()
>     Assert.assertEquals(1, 1)
>   }
> }
> {code}
> I always get the following issue:
> {code}
> rg.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
> 	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
> 	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:161)
> 	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:153)
> 	at com.triviadata.sherlog.streaming.job.SimpleTest$StateMap$.open(SimpleTest.scala:23)
> 	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> 	at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess the main problem is:
> {code}
> Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
> {code}
> I also tried: 
> {code}
>     inputStream
>       .keyBy(a => a)
>       .connect(dictStream.broadcast)
>       .flatMap(StateMap){code]
> {code}
> But still got the same issue. Either way i think it should work without calling keyBy on either of the streams, otherwise what would be the reason to broadcast anything.
> FYI:
> I am running:
> OSX 10.13.1
> Java: Oracle 1.8.0_92
> Scala: 2.11.11
> Fink: 1.3.2, also tried 1.4.0 but got the same problem.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)