You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Komal Mariam <ko...@gmail.com> on 2019/10/03 06:25:50 UTC

Finding the Maximum Value Received so far in a Stream

Hello all,
I'm trying to do a fairly simple task that is to find the maximum value
(Double) received so far in a stream. This is what I implemented:

POJO class:
public class Fish{
    public Integer fish_id;
    public Point coordinate;   //position

public Fish() {};

public Fish(fish_id,double x, double y) {
   //assign to fish object
}

Java_main.java
 DataStream text  = env
                .addSource(new FlinkKafkaConsumer<>("test", new
JSONKeyValueDeserializationSchema(false),properties).setStartFromLatest());

      DataStream<Fish> fishes = text.map(new MapFunction<ObjectNode,
Fish>() {
            @Override
            public Fish map(ObjectNode json) throws Exception {
               Fish fishes = new Fish(
json.get("value").get("id").asInteger()
,json.get("value").get("x_begin").asDouble(),json.get("value").get("y_begin").asDouble());
                return fishes;
            }
        });


 I can't seem to apply aggregations on the Point class without extracting
the x coordinates in a separate stream so here are the 2 methods I have
applied:

Method 1: Simple reduce


KeyedStream<Fish, Integer> keyed = fishes.keyBy(s->s.fish_id);
        keyed.reduce(new ReduceFunction<Fish>() {
            @Override
            public Fish reduce(Fish t, Fish t1) throws Exception {
                if (t.X > t1.X) {
                    return t;
                } else
                    return t1;
            }
        }).map(new MapFunction<Fish, Double>() {
                    @Override
                    public Double map(Fish t) throws Exception {
                        return t.point.getX();
                    }
                }).print();



Result: 1> -73.8517632
1> -73.851446
1> -73.851446
1> -73.8505642
1> -73.851446       //smaller than previous value!
1> -73.851446
1> -73.851446
1> -73.8505642
1> -73.8517632
1> -73.851446
3> -73.85012
3> -73.850212
3> -73.851979     //smaller than previous value
3> -73.850212
3> -73.8512969
3> -73.8512969

*1)* I'm trying to compute the max so why do I see smaller values after the
latest maximum value. I think this is because order of the outputs is not
preserved as same as inputs?
If this is so how do I ensure that the order is preserved and I only see
the latest maximum value?

*2)* Another thing I have noticed is that if instance 1 produces the max
say -73.8505642 but then instance 2 would produce  -73.9064 which is again
smaller than the value produced by instance 1. I'm assuming its because
there is no communication between parallel instances hence they produce two
value. If this is so how do I get ONE maximum value from all the parallel
instances combined?

Method 2: Using States

public class GetMaximum extends RichMapFunction<Fish, Fish> {

    private transient ValueState<Fish> max;

    @Override
    public Fish map(Fish input) throws Exception {

        // access the state value
       Fish currentMaximum = max.value();

        if (input.point.getX() > currentMaximum.point.getX()) {
            currentMaximum.objid = input.objid;
            currentMaximum.point = (org.locationtech.jts.geom.Point)
input.point.clone();
            max.update(currentMaximum);
        }
        return currentMaximum;
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Fish> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Fish>() {}), //
type information
                        new Fish(-100,0)); // default value of the state,
if nothing was set
        max = getRuntimeContext().getState(descriptor);
    }
}

*3) *I'm getting the same results as method 1. isn't ValueState shared
between all instances of the same operator?

*4) *Out of the two methodologies which is a better choice?

Would really appreciate your help!

Best Regards,
Komal

Re: Finding the Maximum Value Received so far in a Stream

Posted by Jörn Franke <jo...@gmail.com>.
You can not compare doubles in Java (and other languages). Reason is that double has
a limited precision and is rounded. See here for some examples and discussion:

https://howtodoinjava.com/java/basics/correctly-compare-float-double/

Am 03.10.2019 um 08:26 schrieb Komal Mariam <ko...@gmail.com>:
> 
> 
> Hello all,
> I'm trying to do a fairly simple task that is to find the maximum value (Double) received so far in a stream. This is what I implemented:
> 
> POJO class:
> public class Fish{
>     public Integer fish_id;
>     public Point coordinate;   //position
>   
> public Fish() {};
> 
> public Fish(fish_id,double x, double y) {
>    //assign to fish object
> }
> 
> Java_main.java
>  DataStream text  = env
>                 .addSource(new FlinkKafkaConsumer<>("test", new JSONKeyValueDeserializationSchema(false),properties).setStartFromLatest());
> 
>       DataStream<Fish> fishes = text.map(new MapFunction<ObjectNode, Fish>() {
>             @Override
>             public Fish map(ObjectNode json) throws Exception {
>                Fish fishes = new Fish( json.get("value").get("id").asInteger() ,json.get("value").get("x_begin").asDouble(),json.get("value").get("y_begin").asDouble());
>                 return fishes;
>             }
>         });
> 
> 
>  I can't seem to apply aggregations on the Point class without extracting the x coordinates in a separate stream so here are the 2 methods I have applied:
> 
> Method 1: Simple reduce
> 
> KeyedStream<Fish, Integer> keyed = fishes.keyBy(s->s.fish_id);
>         keyed.reduce(new ReduceFunction<Fish>() {
>             @Override
>             public Fish reduce(Fish t, Fish t1) throws Exception {
>                 if (t.X > t1.X) {
>                     return t;
>                 } else
>                     return t1;
>             }
>         }).map(new MapFunction<Fish, Double>() {
>                     @Override
>                     public Double map(Fish t) throws Exception {
>                         return t.point.getX();
>                     }
>                 }).print();
> 
> 
> Result: 1> -73.8517632
> 1> -73.851446
> 1> -73.851446
> 1> -73.8505642
> 1> -73.851446       //smaller than previous value!
> 1> -73.851446
> 1> -73.851446
> 1> -73.8505642
> 1> -73.8517632
> 1> -73.851446
> 3> -73.85012          
> 3> -73.850212
> 3> -73.851979     //smaller than previous value
> 3> -73.850212
> 3> -73.8512969
> 3> -73.8512969  
> 
> 1) I'm trying to compute the max so why do I see smaller values after the latest maximum value. I think this is because order of the outputs is not preserved as same as inputs?
> If this is so how do I ensure that the order is preserved and I only see the latest maximum value?
> 
> 2) Another thing I have noticed is that if instance 1 produces the max say -73.8505642 but then instance 2 would produce  -73.9064 which is again smaller than the value produced by instance 1. I'm assuming its because there is no communication between parallel instances hence they produce two value. If this is so how do I get ONE maximum value from all the parallel instances combined? 
> 
> Method 2: Using States
> 
> public class GetMaximum extends RichMapFunction<Fish, Fish> {
> 
>     private transient ValueState<Fish> max;
> 
>     @Override
>     public Fish map(Fish input) throws Exception {
> 
>         // access the state value
>        Fish currentMaximum = max.value();
> 
>         if (input.point.getX() > currentMaximum.point.getX()) {
>             currentMaximum.objid = input.objid;
>             currentMaximum.point = (org.locationtech.jts.geom.Point) input.point.clone();
>             max.update(currentMaximum);
>         }
>         return currentMaximum;
>     }
> 
>     @Override
>     public void open(Configuration config) {
>         ValueStateDescriptor<Fish> descriptor =
>                 new ValueStateDescriptor<>(
>                         "average", // the state name
>                         TypeInformation.of(new TypeHint<Fish>() {}), // type information
>                         new Fish(-100,0)); // default value of the state, if nothing was set
>         max = getRuntimeContext().getState(descriptor);
>     }
> }
> 
> 3) I'm getting the same results as method 1. isn't ValueState shared between all instances of the same operator? 
> 
> 4) Out of the two methodologies which is a better choice?
> 
> Would really appreciate your help!
> 
> Best Regards,
> Komal
> 
> 
> 
> 

Re: Finding the Maximum Value Received so far in a Stream

Posted by Komal Mariam <ko...@gmail.com>.
Thank you for your help all. I understand now and made the changes.

 Since I needed return the entire object that contained the max value of X,
I used reduce instead of max.

Re: Finding the Maximum Value Received so far in a Stream

Posted by Theo Diefenthal <th...@scoop-software.de>.
Hi Komal, 

regarding using max Method: You can call .map() on your stream and convert the POJO to another stream/type, e.g. having only the x coordinate of the POJO and then apply the max operator. 

And as the others said: You are working on a keyed stream per fish_id, so you will get one maximum per fish-id. If you want to have a global maximum, you should not key by fish_id first. 

Best regards 
Theo 


Von: "Komal Mariam" <ko...@gmail.com> 
An: "Jörn Franke" <jo...@gmail.com> 
CC: "user" <us...@flink.apache.org> 
Gesendet: Donnerstag, 3. Oktober 2019 11:01:54 
Betreff: Re: Finding the Maximum Value Received so far in a Stream 

Hi J ö rn! 

Thanks for your suggestions. 

Btw just a correction in the Fish class it's " public Point point" not " public Point coordinate". 

For double type comparison, I implemented what you suggested and used BigDecimal for their comparison. I'm still getting the same results where I see smaller values after the latest maximum value. 

I would prefer using the max method but I need to do max on the x-value inside the Point object contained inside the Fish object. (I retrieve it using fish.point.getX() ;) 

The max method on keyed stream works by providing the field name in the form of a string keyedStream.max("point") so is there a way I can reference the x coordinate inside the point object to max on? 
Is there a way I can implement keyedStream.max(fish.point.getX())? 



About your reduce function: 
You execute it by fish_id if I see it correctly. This will create one result by fish_id . I propose to map first all fish coordinates under a single key and then reduce by this single key. 



^Will try this next. 

Regards, 
Komal 





On Thu, 3 Oct 2019 at 16:42, Jörn Franke < [ mailto:jornfranke@gmail.com | jornfranke@gmail.com ] > wrote: 

BQ_BEGIN

Btw. Why don’t you use the max method? 

[ https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#max-java.lang.String- | https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#max-java.lang.String- ] 

See here about the state solution: 

[ https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html | https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html ] 

About your reduce function: 
You execute it by fish_id if I see it correctly. This will create one result by fish_id . I propose to map first all fish coordinates under a single key and then reduce by this single key. 


BQ_BEGIN
Am 03.10.2019 um 08:26 schrieb Komal Mariam < [ mailto:komal.mariam@gmail.com | komal.mariam@gmail.com ] >: 


BQ_END


BQ_BEGIN

Hello all, 
I'm trying to do a fairly simple task that is to find the maximum value (Double) received so far in a stream. This is what I implemented: 

POJO class: 
public class Fish{ 
public Integer fish_id; 
public Point coordinate; //position 

public Fish() {}; 

public Fish(fish_id,double x, double y) { 
//assign to fish object 
} 

Java_main.java 
DataStream text = env 
.addSource(new FlinkKafkaConsumer<>("test", new JSONKeyValueDeserializationSchema(false),properties).setStartFromLatest()); 

DataStream<Fish> fishes = text.map(new MapFunction<ObjectNode, Fish>() { 
@Override 
public Fish map(ObjectNode json) throws Exception { 
Fish fishes = new Fish( json.get("value").get("id").asInteger() ,json.get("value").get("x_begin").asDouble(),json.get("value").get("y_begin").asDouble()); 
return fishes; 
} 
}); 


I can't seem to apply aggregations on the Point class without extracting the x coordinates in a separate stream so here are the 2 methods I have applied: 

Method 1: Simple reduce 

BQ_BEGIN

KeyedStream<Fish, Integer> keyed = fishes.keyBy(s->s.fish_id); 
keyed.reduce(new ReduceFunction<Fish>() { 
@Override 
public Fish reduce(Fish t, Fish t1) throws Exception { 
if (t.X > t1.X) { 
return t; 
} else 
return t1; 
} 
}).map(new MapFunction<Fish, Double>() { 
@Override 
public Double map(Fish t) throws Exception { 
return t.point.getX(); 
} 
}).print(); 

BQ_END



Result: 1> -73.8517632 
1> -73.851446 
1> -73.851446 
1> -73.8505642 
1> -73.851446 //smaller than previous value! 
1> -73.851446 
1> -73.851446 
1> -73.8505642 
1> -73.8517632 
1> -73.851446 
3> -73.85012 
3> -73.850212 
3> -73.851979 //smaller than previous value 
3> -73.850212 
3> -73.8512969 
3> -73.8512969 
1) I'm trying to compute the max so why do I see smaller values after the latest maximum value. I think this is because order of the outputs is not preserved as same as inputs? 
If this is so how do I ensure that the order is preserved and I only see the latest maximum value? 

2) Another thing I have noticed is that if instance 1 produces the max say -73.8505642 but then instance 2 would produce -73.9064 which is again smaller than the value produced by instance 1. I'm assuming its because there is no communication between parallel instances hence they produce two value. If this is so how do I get ONE maximum value from all the parallel instances combined? 

Method 2: Using States 

public class GetMaximum extends RichMapFunction<Fish, Fish> { 

private transient ValueState<Fish> max; 

@Override 
public Fish map(Fish input) throws Exception { 

// access the state value 
Fish currentMaximum = max.value(); 

if (input.point.getX() > currentMaximum.point.getX()) { 
currentMaximum.objid = input.objid; 
currentMaximum.point = (org.locationtech.jts.geom.Point) input.point.clone(); 
max.update(currentMaximum); 
} 
return currentMaximum; 
} 

@Override 
public void open(Configuration config) { 
ValueStateDescriptor<Fish> descriptor = 
new ValueStateDescriptor<>( 
"average", // the state name 
TypeInformation.of(new TypeHint<Fish>() {}), // type information 
new Fish(-100,0)); // default value of the state, if nothing was set 
max = getRuntimeContext().getState(descriptor); 
} 
} 

3) I'm getting the same results as method 1. isn't ValueState shared between all instances of the same operator? 
4) Out of the two methodologies which is a better choice? 

Would really appreciate your help! 

Best Regards, 
Komal 





BQ_END


BQ_END


Re: Finding the Maximum Value Received so far in a Stream

Posted by Komal Mariam <ko...@gmail.com>.
Hi Jörn!

Thanks for your suggestions.

Btw just a correction in the Fish class it's "public Point point" not  "public
Point coordinate".

For double type comparison, I implemented what you suggested and used
BigDecimal for their comparison. I'm still getting the same results where I
see smaller values after the latest maximum value.

I would prefer using the max method but I need to do max on the x-value
inside the Point object contained inside the Fish object. (I retrieve it
using fish.point.getX();)

The max method on keyed stream works by providing the field name in the
form of a string  keyedStream.max("point")  so is there a way I can
reference the x coordinate inside the point object to max on?
Is there a way I can implement  keyedStream.max(fish.point.getX())?

About your reduce function:
> You execute it by fish_id if I see it correctly. This will create one
> result by fish_id . I propose to map first all fish coordinates under a
> single key and then reduce by this single key.


^Will try this next.

Regards,
Komal





On Thu, 3 Oct 2019 at 16:42, Jörn Franke <jo...@gmail.com> wrote:

> Btw. Why don’t you use the max method?
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#max-java.lang.String-
>
> See here about the state solution:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html
>
> About your reduce function:
> You execute it by fish_id if I see it correctly. This will create one
> result by fish_id . I propose to map first all fish coordinates under a
> single key and then reduce by this single key.
>
> Am 03.10.2019 um 08:26 schrieb Komal Mariam <ko...@gmail.com>:
>
> 
> Hello all,
> I'm trying to do a fairly simple task that is to find the maximum value
> (Double) received so far in a stream. This is what I implemented:
>
> POJO class:
> public class Fish{
>     public Integer fish_id;
>     public Point coordinate;   //position
>
> public Fish() {};
>
> public Fish(fish_id,double x, double y) {
>    //assign to fish object
> }
>
> Java_main.java
>  DataStream text  = env
>                 .addSource(new FlinkKafkaConsumer<>("test", new
> JSONKeyValueDeserializationSchema(false),properties).setStartFromLatest());
>
>       DataStream<Fish> fishes = text.map(new MapFunction<ObjectNode,
> Fish>() {
>             @Override
>             public Fish map(ObjectNode json) throws Exception {
>                Fish fishes = new Fish(
> json.get("value").get("id").asInteger()
> ,json.get("value").get("x_begin").asDouble(),json.get("value").get("y_begin").asDouble());
>                 return fishes;
>             }
>         });
>
>
>  I can't seem to apply aggregations on the Point class without extracting
> the x coordinates in a separate stream so here are the 2 methods I have
> applied:
>
> Method 1: Simple reduce
>
>
> KeyedStream<Fish, Integer> keyed = fishes.keyBy(s->s.fish_id);
>         keyed.reduce(new ReduceFunction<Fish>() {
>             @Override
>             public Fish reduce(Fish t, Fish t1) throws Exception {
>                 if (t.X > t1.X) {
>                     return t;
>                 } else
>                     return t1;
>             }
>         }).map(new MapFunction<Fish, Double>() {
>                     @Override
>                     public Double map(Fish t) throws Exception {
>                         return t.point.getX();
>                     }
>                 }).print();
>
>
>
> Result: 1> -73.8517632
> 1> -73.851446
> 1> -73.851446
> 1> -73.8505642
> 1> -73.851446       //smaller than previous value!
> 1> -73.851446
> 1> -73.851446
> 1> -73.8505642
> 1> -73.8517632
> 1> -73.851446
> 3> -73.85012
> 3> -73.850212
> 3> -73.851979     //smaller than previous value
> 3> -73.850212
> 3> -73.8512969
> 3> -73.8512969
>
> *1)* I'm trying to compute the max so why do I see smaller values after
> the latest maximum value. I think this is because order of the outputs is
> not preserved as same as inputs?
> If this is so how do I ensure that the order is preserved and I only see
> the latest maximum value?
>
> *2)* Another thing I have noticed is that if instance 1 produces the max
> say -73.8505642 but then instance 2 would produce  -73.9064 which is again
> smaller than the value produced by instance 1. I'm assuming its because
> there is no communication between parallel instances hence they produce two
> value. If this is so how do I get ONE maximum value from all the parallel
> instances combined?
>
> Method 2: Using States
>
> public class GetMaximum extends RichMapFunction<Fish, Fish> {
>
>     private transient ValueState<Fish> max;
>
>     @Override
>     public Fish map(Fish input) throws Exception {
>
>         // access the state value
>        Fish currentMaximum = max.value();
>
>         if (input.point.getX() > currentMaximum.point.getX()) {
>             currentMaximum.objid = input.objid;
>             currentMaximum.point = (org.locationtech.jts.geom.Point)
> input.point.clone();
>             max.update(currentMaximum);
>         }
>         return currentMaximum;
>     }
>
>     @Override
>     public void open(Configuration config) {
>         ValueStateDescriptor<Fish> descriptor =
>                 new ValueStateDescriptor<>(
>                         "average", // the state name
>                         TypeInformation.of(new TypeHint<Fish>() {}), //
> type information
>                         new Fish(-100,0)); // default value of the state,
> if nothing was set
>         max = getRuntimeContext().getState(descriptor);
>     }
> }
>
> *3) *I'm getting the same results as method 1. isn't ValueState shared
> between all instances of the same operator?
>
> *4) *Out of the two methodologies which is a better choice?
>
> Would really appreciate your help!
>
> Best Regards,
> Komal
>
>
>
>
>

Re: Finding the Maximum Value Received so far in a Stream

Posted by Jörn Franke <jo...@gmail.com>.
Btw. Why don’t you use the max method?  

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#max-java.lang.String-

See here about the state solution:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html

About your reduce function:
You execute it by fish_id if I see it correctly. This will create one result by fish_id . I propose to map first all fish coordinates under a single key and then reduce by this single key. 

> Am 03.10.2019 um 08:26 schrieb Komal Mariam <ko...@gmail.com>:
> 
> 
> Hello all,
> I'm trying to do a fairly simple task that is to find the maximum value (Double) received so far in a stream. This is what I implemented:
> 
> POJO class:
> public class Fish{
>     public Integer fish_id;
>     public Point coordinate;   //position
>   
> public Fish() {};
> 
> public Fish(fish_id,double x, double y) {
>    //assign to fish object
> }
> 
> Java_main.java
>  DataStream text  = env
>                 .addSource(new FlinkKafkaConsumer<>("test", new JSONKeyValueDeserializationSchema(false),properties).setStartFromLatest());
> 
>       DataStream<Fish> fishes = text.map(new MapFunction<ObjectNode, Fish>() {
>             @Override
>             public Fish map(ObjectNode json) throws Exception {
>                Fish fishes = new Fish( json.get("value").get("id").asInteger() ,json.get("value").get("x_begin").asDouble(),json.get("value").get("y_begin").asDouble());
>                 return fishes;
>             }
>         });
> 
> 
>  I can't seem to apply aggregations on the Point class without extracting the x coordinates in a separate stream so here are the 2 methods I have applied:
> 
> Method 1: Simple reduce
> 
> KeyedStream<Fish, Integer> keyed = fishes.keyBy(s->s.fish_id);
>         keyed.reduce(new ReduceFunction<Fish>() {
>             @Override
>             public Fish reduce(Fish t, Fish t1) throws Exception {
>                 if (t.X > t1.X) {
>                     return t;
>                 } else
>                     return t1;
>             }
>         }).map(new MapFunction<Fish, Double>() {
>                     @Override
>                     public Double map(Fish t) throws Exception {
>                         return t.point.getX();
>                     }
>                 }).print();
> 
> 
> Result: 1> -73.8517632
> 1> -73.851446
> 1> -73.851446
> 1> -73.8505642
> 1> -73.851446       //smaller than previous value!
> 1> -73.851446
> 1> -73.851446
> 1> -73.8505642
> 1> -73.8517632
> 1> -73.851446
> 3> -73.85012          
> 3> -73.850212
> 3> -73.851979     //smaller than previous value
> 3> -73.850212
> 3> -73.8512969
> 3> -73.8512969  
> 
> 1) I'm trying to compute the max so why do I see smaller values after the latest maximum value. I think this is because order of the outputs is not preserved as same as inputs?
> If this is so how do I ensure that the order is preserved and I only see the latest maximum value?
> 
> 2) Another thing I have noticed is that if instance 1 produces the max say -73.8505642 but then instance 2 would produce  -73.9064 which is again smaller than the value produced by instance 1. I'm assuming its because there is no communication between parallel instances hence they produce two value. If this is so how do I get ONE maximum value from all the parallel instances combined? 
> 
> Method 2: Using States
> 
> public class GetMaximum extends RichMapFunction<Fish, Fish> {
> 
>     private transient ValueState<Fish> max;
> 
>     @Override
>     public Fish map(Fish input) throws Exception {
> 
>         // access the state value
>        Fish currentMaximum = max.value();
> 
>         if (input.point.getX() > currentMaximum.point.getX()) {
>             currentMaximum.objid = input.objid;
>             currentMaximum.point = (org.locationtech.jts.geom.Point) input.point.clone();
>             max.update(currentMaximum);
>         }
>         return currentMaximum;
>     }
> 
>     @Override
>     public void open(Configuration config) {
>         ValueStateDescriptor<Fish> descriptor =
>                 new ValueStateDescriptor<>(
>                         "average", // the state name
>                         TypeInformation.of(new TypeHint<Fish>() {}), // type information
>                         new Fish(-100,0)); // default value of the state, if nothing was set
>         max = getRuntimeContext().getState(descriptor);
>     }
> }
> 
> 3) I'm getting the same results as method 1. isn't ValueState shared between all instances of the same operator? 
> 
> 4) Out of the two methodologies which is a better choice?
> 
> Would really appreciate your help!
> 
> Best Regards,
> Komal
> 
> 
> 
>