You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nicholas Walton <nw...@me.com> on 2018/07/16 20:32:54 UTC

Parallelism and keyed streams

I have a stream of tuples <channel: Int, index: Long, value: Double> , which I form into a keyedStream using keyBy on channel. I then need to process each channel in parallel. Each parallel stream must be processed in strict sequential order by index to calculate the ratios value(index)/value(index-1). If I set parallelism to 1 all is well, each channel is processed in order of index 1,2,3,,4…

My problem is when I set parallelism to a value greater than 1 each channel’s keyedStream  appears to be split across multiple processes. So a channel may be processed wrongly for example  as value(2), value(5), Value(6) , value(9)…..

The number of channels N is unknown. So how do I rig up N processing streams with an unknown parallelism so that each stream processes each channel by strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n)

Thanks in advance

NIck Walton

Re: Parallelism and keyed streams

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Flink guarantees order only within a partition. For example, if you have
the program map_1 -> map_2 and both map functions run with parallelism 4,
the order of records in each of the 4 partitions is not changed..
In case of a shuffle (such as a keyBy or change in parallelism) records are
shipped to the same downstream task in the same order as they are produced
by the shipping task.
However, the receiving task merges streams of received records from
multiple sending tasks without considering the order across parallel
streams.

This means that you have to manually sort the data if the input data of the
scaled-reading map function receives the input data for the same key from
different source tasks.
Sorting streaming data is of course not as straight-forward as for finite
sets, but you can do it with a process function and timers, i.e., you
collect data for a certain amount of time and sort it.

Best, Fabian



2018-07-17 10:06 GMT+02:00 Nicholas Walton <nw...@me.com>:

> Martin,
>
> To clarify things the code causing the issue is here, nothing clever. The
> code fails at the line in bold. The Long index values are set earlier in
> sequence 1,2,3,4,5,6,7…...
>
> val scaledReadings : DataStream[(Int,Long, Double, Double)] =
> maxChannelReading
>       .keyBy(0)
>       .map { in =>
>         LOG.info <http://log.info>(s"scaledReadings $in")
>         (in._1, in._2, in._3/in._4 + 2.0D, in._3) }
>
>
>  val logRatioWindow: DataStream[(Int,Long, Int, Double)] = scaledReadings
>       .keyBy(0)
>       .countWindow(100, 99)
>       .process(new logRatioWindowFunction() )
>
>
> and
>
> class logRatioWindowFunction extends ProcessWindowFunction[(Int, Long,
> Double, Double), (Int, Long, Int, Double), org.apache.flink.api.java.tuple.Tuple,
> GlobalWindow] {
>
>   def process(key: Tuple, context: logRatioWindowFunction.this.Context,
> input: Iterable[(Int, Long, Double, Double)], out: Collector[(Int, Long,
> Int, Double)]) =
>   {
>
>     val a: Array[(Int, Long, Double, Double)] = input toArray;
>     val ch = a(0)._1
>     val s = a(0)._2
>     val l = input.size
>
>     if (l < 100) Job.LOG.info <http://job.log.info>(s"Log ratio window
> length $l on channel $ch at sample $s")
>
>     *for (i <- 1 to a.size - 1) assert (a(i)._2 == a(i-1)._2+1,
> "logRatioWindowFunction:Failure non-monotonic indexes "+  a(i-1)._2 + " and
> " + a(i)._2 )*
>
>     if (l == 100) {
>       for (i <- 0 to l-2) {
>         val v: Int = rint(100 * log (E + a(i+1)._3 / a(i)._3)) toInt;
>         assert(v > 0, "Bad minhash in medianLogRatioWindowFunction " + v)
>         Job.LOG.debug("logRatioWindowFunction [" + a(i+1)._1 + ", " +
> a(i+1)._2 + ", " +  v+ ", " +  a(i+1)._4 +"]")
>         out.collect(scala.Tuple4(a(i+1)._1, a(i+1)._2, v, a(i+1)._4))
>       }
>       Job.LOG.debug("logRatioWindowFunction [" + a.head._1 + ", " +
> a.head._2 + " ... " + a.last._2 +"] collected")
>     }
>   }
>
> }
>
>
> On 17 Jul 2018, at 00:15, Martin, Nick <Nick.Martin@OrbitalATK.com
> <Ni...@orbitalatk.com>> wrote:
>
> Is value(index-1) stored in Keyed State, or just a local variable inside
> the operator?
>
> -----Original Message-----
> From: Nicholas Walton [mailto:nwalton@me.com <nw...@me.com>]
> Sent: Monday, July 16, 2018 1:33 PM
> To: user@flink.apache.org
> Subject: Parallelism and keyed streams
>
> I have a stream of tuples <channel: Int, index: Long, value: Double> ,
> which I form into a keyedStream using keyBy on channel. I then need to
> process each channel in parallel. Each parallel stream must be processed in
> strict sequential order by index to calculate the ratios
> value(index)/value(index-1). If I set parallelism to 1 all is well, each
> channel is processed in order of index 1,2,3,,4…
>
> My problem is when I set parallelism to a value greater than 1 each
> channel’s keyedStream  appears to be split across multiple processes. So a
> channel may be processed wrongly for example  as value(2), value(5),
> Value(6) , value(9)…..
>
> The number of channels N is unknown. So how do I rig up N processing
> streams with an unknown parallelism so that each stream processes each
> channel by strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n)
>
> Thanks in advance
>
> NIck Walton
>
>
> ------------------------------------------------------------
> ------------------
>
> Notice: This e-mail is intended solely for use of the individual or entity
> to which it is addressed and may contain information that is proprietary,
> privileged and/or exempt from disclosure under applicable law. If the
> reader is not the intended recipient or agent responsible for delivering
> the message to the intended recipient, you are hereby notified that any
> dissemination, distribution or copying of this communication is strictly
> prohibited. This communication may also contain data subject to U.S. export
> laws. If so, data subject to the International Traffic in Arms Regulation
> cannot be disseminated, distributed, transferred, or copied, whether
> incorporated or in its original form, to foreign nationals residing in the
> U.S. or abroad, absent the express prior approval of the U.S. Department of
> State. Data subject to the Export Administration Act may not be
> disseminated, distributed, transferred or copied contrary to U. S.
> Department of Commerce regulations. If you have received this communication
> in error, please notify the sender by reply e-mail and destroy the e-mail
> message and any physical copies made of the communication.
> Thank you.
> *********************
>
>
>

Re: Parallelism and keyed streams

Posted by Nicholas Walton <nw...@me.com>.
Martin,

To clarify things the code causing the issue is here, nothing clever. The code fails at the line in bold. The Long index values are set earlier in sequence 1,2,3,4,5,6,7…...

val scaledReadings : DataStream[(Int,Long, Double, Double)] = maxChannelReading
      .keyBy(0)
      .map { in =>
        LOG.info <http://log.info/>(s"scaledReadings $in")
        (in._1, in._2, in._3/in._4 + 2.0D, in._3) }


 val logRatioWindow: DataStream[(Int,Long, Int, Double)] = scaledReadings
      .keyBy(0)
      .countWindow(100, 99)
      .process(new logRatioWindowFunction() )


and

class logRatioWindowFunction extends ProcessWindowFunction[(Int, Long, Double, Double), (Int, Long, Int, Double), org.apache.flink.api.java.tuple.Tuple, GlobalWindow] {

  def process(key: Tuple, context: logRatioWindowFunction.this.Context, input: Iterable[(Int, Long, Double, Double)], out: Collector[(Int, Long, Int, Double)]) = 
  {

    val a: Array[(Int, Long, Double, Double)] = input toArray;
    val ch = a(0)._1
    val s = a(0)._2
    val l = input.size

    if (l < 100) Job.LOG.info <http://job.log.info/>(s"Log ratio window length $l on channel $ch at sample $s")

    for (i <- 1 to a.size - 1) assert (a(i)._2 == a(i-1)._2+1, "logRatioWindowFunction:Failure non-monotonic indexes "+  a(i-1)._2 + " and " + a(i)._2 )

    if (l == 100) {
      for (i <- 0 to l-2) {
        val v: Int = rint(100 * log (E + a(i+1)._3 / a(i)._3)) toInt;
        assert(v > 0, "Bad minhash in medianLogRatioWindowFunction " + v)
        Job.LOG.debug("logRatioWindowFunction [" + a(i+1)._1 + ", " + a(i+1)._2 + ", " +  v+ ", " +  a(i+1)._4 +"]")
        out.collect(scala.Tuple4(a(i+1)._1, a(i+1)._2, v, a(i+1)._4))
      }
      Job.LOG.debug("logRatioWindowFunction [" + a.head._1 + ", " + a.head._2 + " ... " + a.last._2 +"] collected")
    }
  }

}


> On 17 Jul 2018, at 00:15, Martin, Nick <Nick.Martin@OrbitalATK.com <ma...@orbitalatk.com>> wrote:
> 
> Is value(index-1) stored in Keyed State, or just a local variable inside the operator?
> 
> -----Original Message-----
> From: Nicholas Walton [mailto:nwalton@me.com <ma...@me.com>] 
> Sent: Monday, July 16, 2018 1:33 PM
> To: user@flink.apache.org <ma...@flink.apache.org>
> Subject: Parallelism and keyed streams
> 
> I have a stream of tuples <channel: Int, index: Long, value: Double> , which I form into a keyedStream using keyBy on channel. I then need to process each channel in parallel. Each parallel stream must be processed in strict sequential order by index to calculate the ratios value(index)/value(index-1). If I set parallelism to 1 all is well, each channel is processed in order of index 1,2,3,,4…
> 
> My problem is when I set parallelism to a value greater than 1 each channel’s keyedStream  appears to be split across multiple processes. So a channel may be processed wrongly for example  as value(2), value(5), Value(6) , value(9)…..
> 
> The number of channels N is unknown. So how do I rig up N processing streams with an unknown parallelism so that each stream processes each channel by strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n)
> 
> Thanks in advance
> 
> NIck Walton
> 
> 
> ------------------------------------------------------------------------------
> 
> Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
> Thank you. 
> *********************


RE: Parallelism and keyed streams

Posted by "Martin, Nick" <Ni...@OrbitalATK.com>.
Is value(index-1) stored in Keyed State, or just a local variable inside the operator?

-----Original Message-----
From: Nicholas Walton [mailto:nwalton@me.com] 
Sent: Monday, July 16, 2018 1:33 PM
To: user@flink.apache.org
Subject: Parallelism and keyed streams

I have a stream of tuples <channel: Int, index: Long, value: Double> , which I form into a keyedStream using keyBy on channel. I then need to process each channel in parallel. Each parallel stream must be processed in strict sequential order by index to calculate the ratios value(index)/value(index-1). If I set parallelism to 1 all is well, each channel is processed in order of index 1,2,3,,4…

My problem is when I set parallelism to a value greater than 1 each channel’s keyedStream  appears to be split across multiple processes. So a channel may be processed wrongly for example  as value(2), value(5), Value(6) , value(9)…..

The number of channels N is unknown. So how do I rig up N processing streams with an unknown parallelism so that each stream processes each channel by strictly increasing index v(1),v(2),…..v(t),v(t+1),…..v(t+n)

Thanks in advance

NIck Walton


------------------------------------------------------------------------------

Notice: This e-mail is intended solely for use of the individual or entity to which it is addressed and may contain information that is proprietary, privileged and/or exempt from disclosure under applicable law. If the reader is not the intended recipient or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. This communication may also contain data subject to U.S. export laws. If so, data subject to the International Traffic in Arms Regulation cannot be disseminated, distributed, transferred, or copied, whether incorporated or in its original form, to foreign nationals residing in the U.S. or abroad, absent the express prior approval of the U.S. Department of State. Data subject to the Export Administration Act may not be disseminated, distributed, transferred or copied contrary to U. S. Department of Commerce regulations. If you have received this communication in error, please notify the sender by reply e-mail and destroy the e-mail message and any physical copies made of the communication.
 Thank you. 
*********************