You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by greghogan <gi...@git.apache.org> on 2015/09/29 17:22:53 UTC

[GitHub] flink pull request: [FLINK-2725] Max/Min/Sum Aggregation of mutabl...

GitHub user greghogan opened a pull request:

    https://github.com/apache/flink/pull/1191

    [FLINK-2725] Max/Min/Sum Aggregation of mutable types

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/greghogan/flink 2725_max_min_sum_aggregation_of_mutable_types

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1191.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1191
    
----
commit c828dedb8ea05dbfaec06765e3effd7c95b13516
Author: Greg Hogan <co...@greghogan.com>
Date:   2015-09-22T17:01:47Z

    [FLINK-2725] Max/Min/Sum Aggregation of mutable types

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2725] Max/Min/Sum Aggregation of mutabl...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1191#discussion_r41641649
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java ---
    @@ -113,11 +182,32 @@ public Long getAggregate() {
     			return agg;
     		}
     	}
    -	
    +
    +	public static final class LongValueSumAgg extends SumAggregationFunction<LongValue> {
    +		private static final long serialVersionUID = 1L;
    +
    +		private long agg;
    +
    +		@Override
    +		public void initializeAggregate() {
    +			agg = 0L;
    +		}
    +
    +		@Override
    +		public void aggregate(LongValue value) {
    +			agg += value.getValue();
    +		}
    +
    +		@Override
    +		public LongValue getAggregate() {
    +			return new LongValue(agg);
    +		}
    +	}
    +
     	public static final class FloatSumAgg extends SumAggregationFunction<Float> {
     		private static final long serialVersionUID = 1L;
    -		
    -		private float agg;
    +
    +		private double agg;
    --- End diff --
    
    The extra precision is beneficial when locally accumulating millions to billions of value considering the 23 bits of precision. Is there a performance advantage to using float over double and removing the implicit cast?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2725] Max/Min/Sum Aggregation of mutabl...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1191#issuecomment-148730932
  
    Thanks for the PR!
    I will merge it with the next batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2725] Max/Min/Sum Aggregation of mutabl...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1191#discussion_r41595115
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java ---
    @@ -113,11 +182,32 @@ public Long getAggregate() {
     			return agg;
     		}
     	}
    -	
    +
    +	public static final class LongValueSumAgg extends SumAggregationFunction<LongValue> {
    +		private static final long serialVersionUID = 1L;
    +
    +		private long agg;
    +
    +		@Override
    +		public void initializeAggregate() {
    +			agg = 0L;
    +		}
    +
    +		@Override
    +		public void aggregate(LongValue value) {
    +			agg += value.getValue();
    +		}
    +
    +		@Override
    +		public LongValue getAggregate() {
    +			return new LongValue(agg);
    +		}
    +	}
    +
     	public static final class FloatSumAgg extends SumAggregationFunction<Float> {
     		private static final long serialVersionUID = 1L;
    -		
    -		private float agg;
    +
    +		private double agg;
    --- End diff --
    
    why are you storing the float sum in a double?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2725] Max/Min/Sum Aggregation of mutabl...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1191


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2725] Max/Min/Sum Aggregation of mutabl...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1191#issuecomment-144653905
  
    Looks like a solid addition.
    
    Out of curiosity: Have you made any experiments or microbenchmarks about how this impacts performance?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2725] Max/Min/Sum Aggregation of mutabl...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1191#discussion_r41595057
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java ---
    @@ -51,7 +51,18 @@
     public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implements AtomicType<T> {
     
     	private static final long serialVersionUID = 1L;
    -	
    +
    +	public static final ValueTypeInfo<BooleanValue> BOOLEAN_VALUE_TYPE_INFO = new ValueTypeInfo<>(BooleanValue.class);
    +	public static final ValueTypeInfo<ByteValue> BYTE_VALUE_TYPE_INFO = new ValueTypeInfo<>(ByteValue.class);
    +	public static final ValueTypeInfo<CharValue> CHAR_VALUE_TYPE_INFO = new ValueTypeInfo<>(CharValue.class);
    +	public static final ValueTypeInfo<DoubleValue> DOUBLE_VALUE_TYPE_INFO = new ValueTypeInfo<>(DoubleValue.class);
    +	public static final ValueTypeInfo<FloatValue> FLOAT_VALUE_TYPE_INFO = new ValueTypeInfo<>(FloatValue.class);
    +	public static final ValueTypeInfo<IntValue> INT_VALUE_TYPE_INFO = new ValueTypeInfo<>(IntValue.class);
    +	public static final ValueTypeInfo<LongValue> LONG_VALUE_TYPE_INFO = new ValueTypeInfo<>(LongValue.class);
    +	public static final ValueTypeInfo<NullValue> NULL_VALUE_TYPE_INFO = new ValueTypeInfo<>(NullValue.class);
    +	public static final ValueTypeInfo<ShortValue> SHORT_VALUE_TYPE_INFO = new ValueTypeInfo<>(ShortValue.class);
    +	public static final ValueTypeInfo<StringValue> STRING_VALUE_TYPE_INFO = new ValueTypeInfo<>(StringValue.class);
    --- End diff --
    
    Is the purpose of these statics that users's don't have to instantiate them themselves?
    I'm just wondering because IntelliJ is marking some of them as unused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2725] Max/Min/Sum Aggregation of mutabl...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1191#discussion_r41635292
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java ---
    @@ -51,7 +51,18 @@
     public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implements AtomicType<T> {
     
     	private static final long serialVersionUID = 1L;
    -	
    +
    +	public static final ValueTypeInfo<BooleanValue> BOOLEAN_VALUE_TYPE_INFO = new ValueTypeInfo<>(BooleanValue.class);
    +	public static final ValueTypeInfo<ByteValue> BYTE_VALUE_TYPE_INFO = new ValueTypeInfo<>(ByteValue.class);
    +	public static final ValueTypeInfo<CharValue> CHAR_VALUE_TYPE_INFO = new ValueTypeInfo<>(CharValue.class);
    +	public static final ValueTypeInfo<DoubleValue> DOUBLE_VALUE_TYPE_INFO = new ValueTypeInfo<>(DoubleValue.class);
    +	public static final ValueTypeInfo<FloatValue> FLOAT_VALUE_TYPE_INFO = new ValueTypeInfo<>(FloatValue.class);
    +	public static final ValueTypeInfo<IntValue> INT_VALUE_TYPE_INFO = new ValueTypeInfo<>(IntValue.class);
    +	public static final ValueTypeInfo<LongValue> LONG_VALUE_TYPE_INFO = new ValueTypeInfo<>(LongValue.class);
    +	public static final ValueTypeInfo<NullValue> NULL_VALUE_TYPE_INFO = new ValueTypeInfo<>(NullValue.class);
    +	public static final ValueTypeInfo<ShortValue> SHORT_VALUE_TYPE_INFO = new ValueTypeInfo<>(ShortValue.class);
    +	public static final ValueTypeInfo<StringValue> STRING_VALUE_TYPE_INFO = new ValueTypeInfo<>(StringValue.class);
    --- End diff --
    
    Yes, these follow the form of `BasicTypeInfo` (which, admittedly, are much longer instantiations). These are used extensively in `ValueCollectionDataSets` for creation of `TupleTypeInfo` and which mirrors `CollectionDataSets` which uses the `BasicTypeInfo` statics for the same purpose.
    
    I am surprised IntelliJ is marking public members as unused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2725] Max/Min/Sum Aggregation of mutabl...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1191#issuecomment-144789600
  
    @StephanEwen, I have not yet created or run any microbenchmarks. Are you thinking of comparing immutable aggregators with and without this patch or comparing mutable and immutable aggregators?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---