You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by aris kol <gi...@hotmail.com> on 2016/08/27 22:13:07 UTC

Accessing state in connected streams

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in the State of typeAStream.

One approach would be to use the same stream for the two topics and then pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not initialized (meaning it was not there).


Thanks,
Aris


Re: Accessing state in connected streams

Posted by aris kol <gi...@hotmail.com>.
Worked like a charm.

I realise I tried to do something stupid.
The state created by EventA was handled by a different operator and I was trying to find a way to access it  downstream.
As I understand, the state is operator-scoped which means that only events passing through it can interact with it.

I kind of think this implementation is not ideal anyway, since those events share a key, it would be better to just use a single stream and pattern mover it (so a few network shuffles can be avoided).


________________________________
From: Aljoscha Krettek <al...@apache.org>
Sent: Tuesday, August 30, 2016 2:48 PM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

Ah I see, I'm afraid StatefulFunction is more of an internal implementation detail that cannot be used like that.

This is a small example that shows how you could do a stateful Co-FlatMap function:

object StateExample {

  trait Base { def id: Int }

  case class EventA(id: Int, info: String)
  case class EventB(id: Int, info: String)

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val sourceA = env.fromElements(EventA(1, "hello"), EventA(1, "ciao"))
    val sourceB = env.fromElements(EventB(1, "a"), EventB(1, "b"))

    sourceA.keyBy(_.id).connect(sourceB.keyBy(_.id)).flatMap(
      new RichCoFlatMapFunction[EventA, EventB, String] {

        val stateDescriptor = new ListStateDescriptor[String]("seen", StringSerializer.INSTANCE)

        def flatMap1(in: EventA, out: Collector[String]) = {
          val state = getRuntimeContext.getListState(stateDescriptor)
          // add to state for the key of in (the key is used implicitly)
          state.add(in.info<http://in.info>)
        }

        def flatMap2(in: EventB, out: Collector[String]) = {
          val state = getRuntimeContext.getListState(stateDescriptor)
          println(s"GOT $in have seen so far: ${state.get()}")
        }
      })

    env.execute()

  }
}

Let me know if you need more details.

Cheers,
Aljoscha

On Tue, 30 Aug 2016 at 16:21 aris kol <gi...@hotmail.com>> wrote:

Hi Aljoscha,


I removed business objects and logic etc.. I am happy to post here [??]  I am sure this is a common issue when you start to seriously mess with state.


Assuming a type for the Output
And assuming that there is a function (EventA :=> String) in the mapWithState operator of typeAStream (implying the State is just a Seq[String] per key)

def coFun = new CoFlatMapFunction[EventA, EventB, Option[Output]] {

override def flatMap1(in: EventA, out: Collector[Option[Output]]) = out.collect(None)

override def flatMap2(in: EventB, out: Collector[Option[Output]]) = {

 new RichFlatMapFunction[EventB, Option[Output]] with StatefulFunction[EventB, Option[Output], Seq[String]] {

   lazy val stateTypeInfo: TypeInformation[Seq[String]] = implicitly[TypeInformation[Seq[String]]]
   lazy val serializer: TypeSerializer[Seq[String]] = stateTypeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
   override lazy val stateSerializer: TypeSerializer[Seq[String]] = serializer

   override def flatMap(in: EventB, out: Collector[Option[Output]]): Unit = {
     out.collect(
       applyWithState(
         in,
         (in, state) =>
           (state match {
             case None => None
             case Some(s) => Some(Output(...))
           }, state)
       )
     )
   }

   flatMap(in, out)

 }
}
}

applyWithState throws the exception and my intuition says I am doing seriously wrong in the instantiation. I tried to make something work using the mapWithState implementation as a guide and I ended up here.

Thanks,
Aris

________________________________
From: Aljoscha Krettek <al...@apache.org>>
Sent: Tuesday, August 30, 2016 10:06 AM

To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Accessing state in connected streams
Hi Aris,
I think you're on the right track with using a CoFlatMap for this. Could you maybe post the code of your CoFlatMapFunction (or you could send it to me privately if you have concerns with publicly posting it) then I could have a look.

Cheers,
Aljoscha

On Mon, 29 Aug 2016 at 15:48 aris kol <gi...@hotmail.com>> wrote:

Any other opinion on this?


Thanks :)

Aris

From: aris kol <gi...@hotmail.com>>
Sent: Sunday, August 28, 2016 12:04 AM

To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Accessing state in connected streams

In the implementation I am passing just one CoFlatMapFunction, where flatMap1, which operates on EventA, just emits a None (doesn't do anything practically) and flatMap2 tries to access the state and throws the NPE.

It wouldn't make sense to use a mapper in this context, I would still want to flatten afterwards before pushing dowstream.


Aris


________________________________
From: Sameer W <sa...@axiomine.com>>
Sent: Saturday, August 27, 2016 11:40 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Accessing state in connected streams

Ok sorry about that :-). I misunderstood as I am not familiar with Scala code. Just curious though how are you passing two MapFunction's to the flatMap function on the connected stream. The interface of ConnectedStream requires just one CoMap function- https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gi...@hotmail.com>> wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in the State of typeAStream.

One approach would be to use the same stream for the two topics and then pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not initialized (meaning it was not there).


Thanks,
Aris



Re: Accessing state in connected streams

Posted by Aljoscha Krettek <al...@apache.org>.
Ah I see, I'm afraid StatefulFunction is more of an internal implementation
detail that cannot be used like that.

This is a small example that shows how you could do a stateful Co-FlatMap
function:

object StateExample {

  trait Base { def id: Int }

  case class EventA(id: Int, info: String)
  case class EventB(id: Int, info: String)

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val sourceA = env.fromElements(EventA(1, "hello"), EventA(1, "ciao"))
    val sourceB = env.fromElements(EventB(1, "a"), EventB(1, "b"))

    sourceA.keyBy(_.id).connect(sourceB.keyBy(_.id)).flatMap(
      new RichCoFlatMapFunction[EventA, EventB, String] {

        val stateDescriptor = new ListStateDescriptor[String]("seen",
StringSerializer.INSTANCE)

        def flatMap1(in: EventA, out: Collector[String]) = {
          val state = getRuntimeContext.getListState(stateDescriptor)
          // add to state for the key of in (the key is used implicitly)
          state.add(in.info)
        }

        def flatMap2(in: EventB, out: Collector[String]) = {
          val state = getRuntimeContext.getListState(stateDescriptor)
          println(s"GOT $in have seen so far: ${state.get()}")
        }
      })

    env.execute()

  }
}

Let me know if you need more details.

Cheers,
Aljoscha

On Tue, 30 Aug 2016 at 16:21 aris kol <gi...@hotmail.com> wrote:

> Hi Aljoscha,
>
>
> I removed business objects and logic etc.. I am happy to post here [image:
> 😊] I am sure this is a common issue when you start to seriously mess
> with state.
>
>
> Assuming a type for the Output
> And assuming that there is a function (EventA :=> String) in the
> mapWithState operator of typeAStream (implying the State is just a
> Seq[String] per key)
>
> def coFun = new CoFlatMapFunction[EventA, EventB, Option[Output]] {
>
> override def flatMap1(in: EventA, out: Collector[Option[Output]]) =
> out.collect(None)
>
> override def flatMap2(in: EventB, out: Collector[Option[Output]]) = {
>
>  new RichFlatMapFunction[EventB, Option[Output]] with
> StatefulFunction[EventB, Option[Output], Seq[String]] {
>
>    lazy val stateTypeInfo: TypeInformation[Seq[String]] =
> implicitly[TypeInformation[Seq[String]]]
>    lazy val serializer: TypeSerializer[Seq[String]] =
> stateTypeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
>    override lazy val stateSerializer: TypeSerializer[Seq[String]] =
> serializer
>
>    override def flatMap(in: EventB, out: Collector[Option[Output]]): Unit
> = {
>      out.collect(
>        applyWithState(
>          in,
>          (in, state) =>
>            (state match {
>              case None => None
>              case Some(s) => Some(Output(...))
>            }, state)
>        )
>      )
>    }
>
>    flatMap(in, out)
>
>  }
> }
> }
>
>
> applyWithState throws the exception and my intuition says I am doing
> seriously wrong in the instantiation. I tried to make something work using
> the mapWithState implementation as a guide and I ended up here.
>
> Thanks,
> Aris
>
> ------------------------------
> *From:* Aljoscha Krettek <al...@apache.org>
> *Sent:* Tuesday, August 30, 2016 10:06 AM
>
> *To:* user@flink.apache.org
> *Subject:* Re: Accessing state in connected streams
> Hi Aris,
> I think you're on the right track with using a CoFlatMap for this. Could
> you maybe post the code of your CoFlatMapFunction (or you could send it to
> me privately if you have concerns with publicly posting it) then I could
> have a look.
>
> Cheers,
> Aljoscha
>
> On Mon, 29 Aug 2016 at 15:48 aris kol <gi...@hotmail.com> wrote:
>
>> Any other opinion on this?
>>
>>
>> Thanks :)
>>
>> Aris
>> *From:* aris kol <gi...@hotmail.com>
>> *Sent:* Sunday, August 28, 2016 12:04 AM
>>
>> *To:* user@flink.apache.org
>> *Subject:* Re: Accessing state in connected streams
>>
>> In the implementation I am passing just one CoFlatMapFunction, where
>> flatMap1, which operates on EventA, just emits a None (doesn't do anything
>> practically) and flatMap2 tries to access the state and throws the NPE.
>>
>> It wouldn't make sense to use a mapper in this context, I would still
>> want to flatten afterwards before pushing dowstream.
>>
>>
>> Aris
>>
>>
>> ------------------------------
>> *From:* Sameer W <sa...@axiomine.com>
>> *Sent:* Saturday, August 27, 2016 11:40 PM
>> *To:* user@flink.apache.org
>> *Subject:* Re: Accessing state in connected streams
>>
>> Ok sorry about that :-). I misunderstood as I am not familiar with Scala
>> code. Just curious though how are you passing two MapFunction's to the
>> flatMap function on the connected stream. The interface of ConnectedStream
>> requires just one CoMap function-
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html
>>
>> Sameer
>>
>> On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gi...@hotmail.com> wrote:
>>
>>> Let's say I have two types sharing the same trait
>>>
>>> trait Event {
>>> def id: Id
>>> }
>>>
>>> case class EventA(id: Id, info: InfoA) extends Event
>>> case class EventB(id: Id, info: InfoB) extends Event
>>>
>>> Each of these events gets pushed to a Kafka topic and gets consumed by a
>>> stream in Flink.
>>>
>>> Let's say I have two streams
>>>
>>> Events of type A create state:
>>>
>>> val typeAStream = env.addSource(...)
>>> .flatMap(someUnmarshallerForA)
>>> .keyBy(_.id)
>>> .mapWithState(...)
>>>
>>> val typeBStream = env.addSource(...)
>>> .flatMap(someUnmarshallerForB)
>>> .keyBy(_.id)
>>>
>>> I want now to process the events in typeBStream using the information
>>> stored in the State of typeAStream.
>>>
>>> One approach would be to use the same stream for the two topics and then
>>> pattern match, but Event subclasses may grow in numbers and
>>> may have different loads, thus I may want to keep things separate.
>>>
>>> Would something along the lines of:
>>>
>>> typeAStream.connect(typeBStream).
>>> flatMap(
>>> new IdentityFlatMapFunction(),
>>> new SomeRichFlatMapFunctionForEventB[EventB, O] with
>>> StateFulFuntion[EventB, O, G[EventA]] { ... }
>>> )
>>>
>>> work?
>>>
>>> I tried this approach and I ended up in a NPE because the state object
>>> was not initialized (meaning it was not there).
>>>
>>>
>>> Thanks,
>>> Aris
>>>
>>>
>>

Re: Accessing state in connected streams

Posted by aris kol <gi...@hotmail.com>.
Hi Aljoscha,


I removed business objects and logic etc.. I am happy to post here [😊]  I am sure this is a common issue when you start to seriously mess with state.


Assuming a type for the Output
And assuming that there is a function (EventA :=> String) in the mapWithState operator of typeAStream (implying the State is just a Seq[String] per key)

def coFun = new CoFlatMapFunction[EventA, EventB, Option[Output]] {

override def flatMap1(in: EventA, out: Collector[Option[Output]]) = out.collect(None)

override def flatMap2(in: EventB, out: Collector[Option[Output]]) = {

 new RichFlatMapFunction[EventB, Option[Output]] with StatefulFunction[EventB, Option[Output], Seq[String]] {

   lazy val stateTypeInfo: TypeInformation[Seq[String]] = implicitly[TypeInformation[Seq[String]]]
   lazy val serializer: TypeSerializer[Seq[String]] = stateTypeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
   override lazy val stateSerializer: TypeSerializer[Seq[String]] = serializer

   override def flatMap(in: EventB, out: Collector[Option[Output]]): Unit = {
     out.collect(
       applyWithState(
         in,
         (in, state) =>
           (state match {
             case None => None
             case Some(s) => Some(Output(...))
           }, state)
       )
     )
   }

   flatMap(in, out)

 }
}
}

applyWithState throws the exception and my intuition says I am doing seriously wrong in the instantiation. I tried to make something work using the mapWithState implementation as a guide and I ended up here.

Thanks,
Aris

________________________________
From: Aljoscha Krettek <al...@apache.org>
Sent: Tuesday, August 30, 2016 10:06 AM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

Hi Aris,
I think you're on the right track with using a CoFlatMap for this. Could you maybe post the code of your CoFlatMapFunction (or you could send it to me privately if you have concerns with publicly posting it) then I could have a look.

Cheers,
Aljoscha

On Mon, 29 Aug 2016 at 15:48 aris kol <gi...@hotmail.com>> wrote:

Any other opinion on this?


Thanks :)

Aris

From: aris kol <gi...@hotmail.com>>
Sent: Sunday, August 28, 2016 12:04 AM

To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Accessing state in connected streams

In the implementation I am passing just one CoFlatMapFunction, where flatMap1, which operates on EventA, just emits a None (doesn't do anything practically) and flatMap2 tries to access the state and throws the NPE.

It wouldn't make sense to use a mapper in this context, I would still want to flatten afterwards before pushing dowstream.


Aris


________________________________
From: Sameer W <sa...@axiomine.com>>
Sent: Saturday, August 27, 2016 11:40 PM
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Accessing state in connected streams

Ok sorry about that :-). I misunderstood as I am not familiar with Scala code. Just curious though how are you passing two MapFunction's to the flatMap function on the connected stream. The interface of ConnectedStream requires just one CoMap function- https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gi...@hotmail.com>> wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in the State of typeAStream.

One approach would be to use the same stream for the two topics and then pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not initialized (meaning it was not there).


Thanks,
Aris



Re: Accessing state in connected streams

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Aris,
I think you're on the right track with using a CoFlatMap for this. Could
you maybe post the code of your CoFlatMapFunction (or you could send it to
me privately if you have concerns with publicly posting it) then I could
have a look.

Cheers,
Aljoscha

On Mon, 29 Aug 2016 at 15:48 aris kol <gi...@hotmail.com> wrote:

> Any other opinion on this?
>
>
> Thanks :)
>
> Aris
> *From:* aris kol <gi...@hotmail.com>
> *Sent:* Sunday, August 28, 2016 12:04 AM
>
> *To:* user@flink.apache.org
> *Subject:* Re: Accessing state in connected streams
>
> In the implementation I am passing just one CoFlatMapFunction, where
> flatMap1, which operates on EventA, just emits a None (doesn't do anything
> practically) and flatMap2 tries to access the state and throws the NPE.
>
> It wouldn't make sense to use a mapper in this context, I would still want
> to flatten afterwards before pushing dowstream.
>
>
> Aris
>
>
> ------------------------------
> *From:* Sameer W <sa...@axiomine.com>
> *Sent:* Saturday, August 27, 2016 11:40 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Accessing state in connected streams
>
> Ok sorry about that :-). I misunderstood as I am not familiar with Scala
> code. Just curious though how are you passing two MapFunction's to the
> flatMap function on the connected stream. The interface of ConnectedStream
> requires just one CoMap function-
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html
>
> Sameer
>
> On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gi...@hotmail.com> wrote:
>
>> Let's say I have two types sharing the same trait
>>
>> trait Event {
>> def id: Id
>> }
>>
>> case class EventA(id: Id, info: InfoA) extends Event
>> case class EventB(id: Id, info: InfoB) extends Event
>>
>> Each of these events gets pushed to a Kafka topic and gets consumed by a
>> stream in Flink.
>>
>> Let's say I have two streams
>>
>> Events of type A create state:
>>
>> val typeAStream = env.addSource(...)
>> .flatMap(someUnmarshallerForA)
>> .keyBy(_.id)
>> .mapWithState(...)
>>
>> val typeBStream = env.addSource(...)
>> .flatMap(someUnmarshallerForB)
>> .keyBy(_.id)
>>
>> I want now to process the events in typeBStream using the information
>> stored in the State of typeAStream.
>>
>> One approach would be to use the same stream for the two topics and then
>> pattern match, but Event subclasses may grow in numbers and
>> may have different loads, thus I may want to keep things separate.
>>
>> Would something along the lines of:
>>
>> typeAStream.connect(typeBStream).
>> flatMap(
>> new IdentityFlatMapFunction(),
>> new SomeRichFlatMapFunctionForEventB[EventB, O] with
>> StateFulFuntion[EventB, O, G[EventA]] { ... }
>> )
>>
>> work?
>>
>> I tried this approach and I ended up in a NPE because the state object
>> was not initialized (meaning it was not there).
>>
>>
>> Thanks,
>> Aris
>>
>>
>

Re: Accessing state in connected streams

Posted by aris kol <gi...@hotmail.com>.
Any other opinion on this?


Thanks :)

Aris


________________________________
From: aris kol <gi...@hotmail.com>
Sent: Sunday, August 28, 2016 12:04 AM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams


In the implementation I am passing just one CoFlatMapFunction, where flatMap1, which operates on EventA, just emits a None (doesn't do anything practically) and flatMap2 tries to access the state and throws the NPE.

It wouldn't make sense to use a mapper in this context, I would still want to flatten afterwards before pushing dowstream.


Aris


________________________________
From: Sameer W <sa...@axiomine.com>
Sent: Saturday, August 27, 2016 11:40 PM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

Ok sorry about that :-). I misunderstood as I am not familiar with Scala code. Just curious though how are you passing two MapFunction's to the flatMap function on the connected stream. The interface of ConnectedStream requires just one CoMap function- https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gi...@hotmail.com>> wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in the State of typeAStream.

One approach would be to use the same stream for the two topics and then pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not initialized (meaning it was not there).


Thanks,
Aris



Re: Accessing state in connected streams

Posted by aris kol <gi...@hotmail.com>.
In the implementation I am passing just one CoFlatMapFunction, where flatMap1, which operates on EventA, just emits a None (doesn't do anything practically) and flatMap2 tries to access the state and throws the NPE.

It wouldn't make sense to use a mapper in this context, I would still want to flatten afterwards before pushing dowstream.


Aris


________________________________
From: Sameer W <sa...@axiomine.com>
Sent: Saturday, August 27, 2016 11:40 PM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

Ok sorry about that :-). I misunderstood as I am not familiar with Scala code. Just curious though how are you passing two MapFunction's to the flatMap function on the connected stream. The interface of ConnectedStream requires just one CoMap function- https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gi...@hotmail.com>> wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in the State of typeAStream.

One approach would be to use the same stream for the two topics and then pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not initialized (meaning it was not there).


Thanks,
Aris



Re: Accessing state in connected streams

Posted by Sameer W <sa...@axiomine.com>.
Ok sorry about that :-). I misunderstood as I am not familiar with Scala
code. Just curious though how are you passing two MapFunction's to the
flatMap function on the connected stream. The interface of ConnectedStream
requires just one CoMap function-
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gi...@hotmail.com> wrote:

> Let's say I have two types sharing the same trait
>
> trait Event {
> def id: Id
> }
>
> case class EventA(id: Id, info: InfoA) extends Event
> case class EventB(id: Id, info: InfoB) extends Event
>
> Each of these events gets pushed to a Kafka topic and gets consumed by a
> stream in Flink.
>
> Let's say I have two streams
>
> Events of type A create state:
>
> val typeAStream = env.addSource(...)
> .flatMap(someUnmarshallerForA)
> .keyBy(_.id)
> .mapWithState(...)
>
> val typeBStream = env.addSource(...)
> .flatMap(someUnmarshallerForB)
> .keyBy(_.id)
>
> I want now to process the events in typeBStream using the information
> stored in the State of typeAStream.
>
> One approach would be to use the same stream for the two topics and then
> pattern match, but Event subclasses may grow in numbers and
> may have different loads, thus I may want to keep things separate.
>
> Would something along the lines of:
>
> typeAStream.connect(typeBStream).
> flatMap(
> new IdentityFlatMapFunction(),
> new SomeRichFlatMapFunctionForEventB[EventB, O] with
> StateFulFuntion[EventB, O, G[EventA]] { ... }
> )
>
> work?
>
> I tried this approach and I ended up in a NPE because the state object was
> not initialized (meaning it was not there).
>
>
> Thanks,
> Aris
>
>

Re: Accessing state in connected streams

Posted by aris kol <gi...@hotmail.com>.
Hi Sameer,


Thank you for your quick response.


I don't think event ordering is the problem here, the processor doesn't assume any ordering.

KeyedStream[EventA] stores a state of type Set[InfoA] on its key, which I would like KeyedStream[EventB] to access.

The code operates on an Option[Set[InfoA]] without inviting trouble by invoking .get.

applyWithState throws the exception because the private ValueState[S] is never initialised.

Since KeyedStream[EventA] successfully updates the state, it can could be that:

- There is some wrong config in SomeRichFlatMapFunctionForEventB, which is fine and can be fixed

- I am doing something completely wrong that Flink doesn't support.


Thanks,

Aris


________________________________
From: Sameer W <sa...@axiomine.com>
Sent: Saturday, August 27, 2016 10:17 PM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

There is no guarantee about the order in which each stream elements arrive in a connected streams. You have to check if the elements have arrived from Stream A before using the information to process elements from Stream B. Otherwise you have to buffer elements from stream B and check if there are unprocessed elements from stream B when elements arrive from stream A. You might need to do that for elements from both streams depending on how you use them.

You will get  NPE if you assume events have arrived from A and but they might be lagging behind.

On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gi...@hotmail.com>> wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in the State of typeAStream.

One approach would be to use the same stream for the two topics and then pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not initialized (meaning it was not there).


Thanks,
Aris



Re: Accessing state in connected streams

Posted by Sameer W <sa...@axiomine.com>.
There is no guarantee about the order in which each stream elements arrive
in a connected streams. You have to check if the elements have arrived from
Stream A before using the information to process elements from Stream B.
Otherwise you have to buffer elements from stream B and check if there are
unprocessed elements from stream B when elements arrive from stream A. You
might need to do that for elements from both streams depending on how you
use them.

You will get  NPE if you assume events have arrived from A and but they
might be lagging behind.

On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gi...@hotmail.com> wrote:

> Let's say I have two types sharing the same trait
>
> trait Event {
> def id: Id
> }
>
> case class EventA(id: Id, info: InfoA) extends Event
> case class EventB(id: Id, info: InfoB) extends Event
>
> Each of these events gets pushed to a Kafka topic and gets consumed by a
> stream in Flink.
>
> Let's say I have two streams
>
> Events of type A create state:
>
> val typeAStream = env.addSource(...)
> .flatMap(someUnmarshallerForA)
> .keyBy(_.id)
> .mapWithState(...)
>
> val typeBStream = env.addSource(...)
> .flatMap(someUnmarshallerForB)
> .keyBy(_.id)
>
> I want now to process the events in typeBStream using the information
> stored in the State of typeAStream.
>
> One approach would be to use the same stream for the two topics and then
> pattern match, but Event subclasses may grow in numbers and
> may have different loads, thus I may want to keep things separate.
>
> Would something along the lines of:
>
> typeAStream.connect(typeBStream).
> flatMap(
> new IdentityFlatMapFunction(),
> new SomeRichFlatMapFunctionForEventB[EventB, O] with
> StateFulFuntion[EventB, O, G[EventA]] { ... }
> )
>
> work?
>
> I tried this approach and I ended up in a NPE because the state object was
> not initialized (meaning it was not there).
>
>
> Thanks,
> Aris
>
>