You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Balaji Rajagopalan <ba...@olacabs.com> on 2016/03/24 18:21:12 UTC

does reduce function has a bug

I have keyed input stream on DateStream(String,Int) and wrote a reduce on
the keyedStream. The reduce is simple one summing up the integer values of
the same key.

val stream = DataStream(String,Int)
val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
keyedStream.print()

class MyReduceFunction extends ReduceFunction(String,Int) {
   override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int) = {
       (in._1, in._2+in1._2)
   }
}

Here is my sample input stream.
( "k1",1)
("k1",1)
("k2",1)

I was expecting the output of the above program to return
("k1",2)
("k2",1)

where as I got this,
("k1",1)
("k1",2)
("k2",1)

Isn't this a incorrect output.

Balaji

Re: does reduce function has a bug

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
Never mind Till figured out a way, instead of doing the aggregation in
reduce, I moved that logic to apply of the window function.

On Thu, Mar 24, 2016 at 11:33 PM, Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> Till,
>
>   Thanks for your reply, may be I should have given more details. val
> stream = DataStream(String,Int) is already windowed.  Ideally I have all
> the data that I need in my data stream, all my trying to do is like
> HashMap[String,Int] from tuples(String,Int) , if reduce is not the best
> solution, can you please suggest another way to do the same.
>
> val source: DataStream[String] = someSource
> val stream =
> source.keyBy(_._1).window(TumblingEventWindows.of(Time.minutes(xmin))).apply
> { x:String,y:TimeWindow,z:Iterable[(String),w:Collector[(String,Int)]=>
> mywindowfunc(x,y,z,w)}
> val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
> keyedStream.print()
>
> Balaji
>
>
> On Thu, Mar 24, 2016 at 11:21 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Balaji,
>>
>> the output you see is the correct output since you're computing a
>> continuous reduce of the incoming data. Since you haven't defined a time
>> frame for your reduce computation you either would have to wait for all
>> eternity to output the final result or you output every time you've
>> generated a new reduce result  this result (which is of course partial).
>> Since the first option is not very practical, Flink emits the partial
>> reduce results.
>>
>> Cheers,
>> Till
>>
>> On Thu, Mar 24, 2016 at 6:21 PM, Balaji Rajagopalan <
>> balaji.rajagopalan@olacabs.com> wrote:
>>
>>> I have keyed input stream on DateStream(String,Int) and wrote a reduce
>>> on the keyedStream. The reduce is simple one summing up the integer values
>>> of the same key.
>>>
>>> val stream = DataStream(String,Int)
>>> val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
>>> keyedStream.print()
>>>
>>> class MyReduceFunction extends ReduceFunction(String,Int) {
>>>    override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int)
>>> = {
>>>        (in._1, in._2+in1._2)
>>>    }
>>> }
>>>
>>> Here is my sample input stream.
>>> ( "k1",1)
>>> ("k1",1)
>>> ("k2",1)
>>>
>>> I was expecting the output of the above program to return
>>> ("k1",2)
>>> ("k2",1)
>>>
>>> where as I got this,
>>> ("k1",1)
>>> ("k1",2)
>>> ("k2",1)
>>>
>>> Isn't this a incorrect output.
>>>
>>> Balaji
>>>
>>
>>
>

Re: does reduce function has a bug

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
Till,

  Thanks for your reply, may be I should have given more details. val
stream = DataStream(String,Int) is already windowed.  Ideally I have all
the data that I need in my data stream, all my trying to do is like
HashMap[String,Int] from tuples(String,Int) , if reduce is not the best
solution, can you please suggest another way to do the same.

val source: DataStream[String] = someSource
val stream =
source.keyBy(_._1).window(TumblingEventWindows.of(Time.minutes(xmin))).apply
{ x:String,y:TimeWindow,z:Iterable[(String),w:Collector[(String,Int)]=>
mywindowfunc(x,y,z,w)}
val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
keyedStream.print()

Balaji


On Thu, Mar 24, 2016 at 11:21 PM, Till Rohrmann <tr...@apache.org>
wrote:

> Hi Balaji,
>
> the output you see is the correct output since you're computing a
> continuous reduce of the incoming data. Since you haven't defined a time
> frame for your reduce computation you either would have to wait for all
> eternity to output the final result or you output every time you've
> generated a new reduce result  this result (which is of course partial).
> Since the first option is not very practical, Flink emits the partial
> reduce results.
>
> Cheers,
> Till
>
> On Thu, Mar 24, 2016 at 6:21 PM, Balaji Rajagopalan <
> balaji.rajagopalan@olacabs.com> wrote:
>
>> I have keyed input stream on DateStream(String,Int) and wrote a reduce on
>> the keyedStream. The reduce is simple one summing up the integer values of
>> the same key.
>>
>> val stream = DataStream(String,Int)
>> val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
>> keyedStream.print()
>>
>> class MyReduceFunction extends ReduceFunction(String,Int) {
>>    override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int)
>> = {
>>        (in._1, in._2+in1._2)
>>    }
>> }
>>
>> Here is my sample input stream.
>> ( "k1",1)
>> ("k1",1)
>> ("k2",1)
>>
>> I was expecting the output of the above program to return
>> ("k1",2)
>> ("k2",1)
>>
>> where as I got this,
>> ("k1",1)
>> ("k1",2)
>> ("k2",1)
>>
>> Isn't this a incorrect output.
>>
>> Balaji
>>
>
>

Re: does reduce function has a bug

Posted by Till Rohrmann <tr...@apache.org>.
Hi Balaji,

the output you see is the correct output since you're computing a
continuous reduce of the incoming data. Since you haven't defined a time
frame for your reduce computation you either would have to wait for all
eternity to output the final result or you output every time you've
generated a new reduce result  this result (which is of course partial).
Since the first option is not very practical, Flink emits the partial
reduce results.

Cheers,
Till

On Thu, Mar 24, 2016 at 6:21 PM, Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> I have keyed input stream on DateStream(String,Int) and wrote a reduce on
> the keyedStream. The reduce is simple one summing up the integer values of
> the same key.
>
> val stream = DataStream(String,Int)
> val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
> keyedStream.print()
>
> class MyReduceFunction extends ReduceFunction(String,Int) {
>    override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int) =
> {
>        (in._1, in._2+in1._2)
>    }
> }
>
> Here is my sample input stream.
> ( "k1",1)
> ("k1",1)
> ("k2",1)
>
> I was expecting the output of the above program to return
> ("k1",2)
> ("k2",1)
>
> where as I got this,
> ("k1",1)
> ("k1",2)
> ("k2",1)
>
> Isn't this a incorrect output.
>
> Balaji
>