You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Marek Barak (JIRA)" <ji...@apache.org> on 2018/01/10 13:33:00 UTC

[jira] [Updated] (FLINK-8405) Keyed State in broadcasted data steam.

     [ https://issues.apache.org/jira/browse/FLINK-8405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Marek Barak updated FLINK-8405:
-------------------------------
    Description: 
Hi guys,

I am trying to join 2 streams. Where the second stream is an codelist used by the first stream for enrichment. I followed the guide described here:

https://www.safaribooksonline.com/library/view/stream-processing-with/9781491974285/ch04.html

With the distinction that instead of having an local HashMap, i used MapState. This part is actually important since i want my state properly checkpointed in cases of a failure. I managed to reproduce the issue with the following code: 

{code}
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.junit.{Test, Assert }

class SimpleTest extends StreamingMultipleProgramsTestBase {


  val env = StreamExecutionEnvironment.getExecutionEnvironment


  case object StateMap extends RichCoFlatMapFunction[String, (String, Int), Int] {

    var codeList: MapState[String,Int] = _

    override def open(parameters: Configuration): Unit = {
      codeList = getRuntimeContext.getMapState(
        new MapStateDescriptor[String,Int]("test", classOf[String], classOf[Int])

      )
    }

    override def flatMap1(value: String, out: Collector[Int]): Unit = {
      val res = if(codeList.contains(value)) codeList.get(value) else 0

      out.collect(res)
    }

    override def flatMap2(value: (String, Int), out: Collector[Int]): Unit = {
      codeList.put(value._1, value._2)
      out.close()
    }
  }

  @Test
  def job() = {

    val inputStream = env.fromCollection(List("Some", "Some2", "Some3"))
    val dictStream = env.fromCollection(List("Some" -> 1, "Some2" -> 2, "Some3" -> 3))

    inputStream
      .connect(dictStream.broadcast)
      .flatMap(StateMap)

    env.execute()
    Assert.assertEquals(1, 1)
  }
}
{code}

I always get the following issue:

{code}
rg.apache.flink.runtime.client.JobExecutionException: Job execution failed.

	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:161)
	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:153)
	at com.triviadata.sherlog.streaming.job.SimpleTest$StateMap$.open(SimpleTest.scala:23)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:745)
{code}

I guess the main problem is:

{code}
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
{code}

I also tried: 

{code}
    inputStream
      .keyBy(a => a)
      .connect(dictStream.broadcast)
      .flatMap(StateMap){code]
{code}

But still got the same issue. 

FYI:
I am running:
OSX 10.13.1
Java: Oracle 1.8.0_92
Scala: 2.11.11
Fink: 1.3.2, also tried 1.4.0 but got the same problem.

  was:
Hi guys,

I am trying to join 2 streams. Where the second stream is an codelist used by the first stream for enrichment. I followed the guide described here:

https://www.safaribooksonline.com/library/view/stream-processing-with/9781491974285/ch04.html

With the distinction that instead of having an local HashMap, i used MapState. This part is actually important since i want my state properly checkpointed in cases of a failure. I managed to reproduce the issue with the following code: 

{code}
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.junit.{Test, Assert }

class SimpleTest extends StreamingMultipleProgramsTestBase {


  val env = StreamExecutionEnvironment.getExecutionEnvironment


  case object StateMap extends RichCoFlatMapFunction[String, (String, Int), Int] {

    var codeList: MapState[String,Int] = _

    override def open(parameters: Configuration): Unit = {
      codeList = getRuntimeContext.getMapState(
        new MapStateDescriptor[String,Int]("test", classOf[String], classOf[Int])

      )
    }

    override def flatMap1(value: String, out: Collector[Int]): Unit = {
      val res = if(codeList.contains(value)) codeList.get(value) else 0

      out.collect(res)
    }

    override def flatMap2(value: (String, Int), out: Collector[Int]): Unit = {
      codeList.put(value._1, value._2)
      out.close()
    }
  }

  @Test
  def job() = {

    val inputStream = env.fromCollection(List("Some", "Some2", "Some3"))
    val dictStream = env.fromCollection(List("Some" -> 1, "Some2" -> 2, "Some3" -> 3))

    inputStream
      .connect(dictStream.broadcast)
      .flatMap(StateMap)

    env.execute()
    Assert.assertEquals(1, 1)
  }
}
{code}

I always get the following issue:

{code}
rg.apache.flink.runtime.client.JobExecutionException: Job execution failed.

	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:161)
	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:153)
	at com.triviadata.sherlog.streaming.job.SimpleTest$StateMap$.open(SimpleTest.scala:23)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:745)
{code}

I guess the main problem is:

{code}
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
{code}

FYI:
I am running:
OSX 10.13.1
Java: Oracle 1.8.0_92
Scala: 2.11.11
Fink: 1.3.2, also tried 1.4.0 but got the same problem.


> Keyed State in broadcasted data steam. 
> ---------------------------------------
>
>                 Key: FLINK-8405
>                 URL: https://issues.apache.org/jira/browse/FLINK-8405
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Marek Barak
>
> Hi guys,
> I am trying to join 2 streams. Where the second stream is an codelist used by the first stream for enrichment. I followed the guide described here:
> https://www.safaribooksonline.com/library/view/stream-processing-with/9781491974285/ch04.html
> With the distinction that instead of having an local HashMap, i used MapState. This part is actually important since i want my state properly checkpointed in cases of a failure. I managed to reproduce the issue with the following code: 
> {code}
> import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
> import org.apache.flink.util.Collector
> import org.apache.flink.streaming.api.scala._
> import org.junit.{Test, Assert }
> class SimpleTest extends StreamingMultipleProgramsTestBase {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   case object StateMap extends RichCoFlatMapFunction[String, (String, Int), Int] {
>     var codeList: MapState[String,Int] = _
>     override def open(parameters: Configuration): Unit = {
>       codeList = getRuntimeContext.getMapState(
>         new MapStateDescriptor[String,Int]("test", classOf[String], classOf[Int])
>       )
>     }
>     override def flatMap1(value: String, out: Collector[Int]): Unit = {
>       val res = if(codeList.contains(value)) codeList.get(value) else 0
>       out.collect(res)
>     }
>     override def flatMap2(value: (String, Int), out: Collector[Int]): Unit = {
>       codeList.put(value._1, value._2)
>       out.close()
>     }
>   }
>   @Test
>   def job() = {
>     val inputStream = env.fromCollection(List("Some", "Some2", "Some3"))
>     val dictStream = env.fromCollection(List("Some" -> 1, "Some2" -> 2, "Some3" -> 3))
>     inputStream
>       .connect(dictStream.broadcast)
>       .flatMap(StateMap)
>     env.execute()
>     Assert.assertEquals(1, 1)
>   }
> }
> {code}
> I always get the following issue:
> {code}
> rg.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
> 	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
> 	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:161)
> 	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:153)
> 	at com.triviadata.sherlog.streaming.job.SimpleTest$StateMap$.open(SimpleTest.scala:23)
> 	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> 	at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess the main problem is:
> {code}
> Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
> {code}
> I also tried: 
> {code}
>     inputStream
>       .keyBy(a => a)
>       .connect(dictStream.broadcast)
>       .flatMap(StateMap){code]
> {code}
> But still got the same issue. 
> FYI:
> I am running:
> OSX 10.13.1
> Java: Oracle 1.8.0_92
> Scala: 2.11.11
> Fink: 1.3.2, also tried 1.4.0 but got the same problem.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)