You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/18 18:59:01 UTC
[6/9] flink git commit: [FLINK-7183] Activate checkstyle
flink-java/aggregation
[FLINK-7183] Activate checkstyle flink-java/aggregation
This closes #4332.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afb85f5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afb85f5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afb85f5f
Branch: refs/heads/master
Commit: afb85f5f15bf734163b88c3558851f24f50da2f6
Parents: daed1ee
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri Jul 14 10:29:06 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 18 17:03:13 2017 +0200
----------------------------------------------------------------------
.../java/aggregation/AggregationFunction.java | 4 +--
.../aggregation/AggregationFunctionFactory.java | 5 ++-
.../api/java/aggregation/Aggregations.java | 13 +++++---
.../aggregation/AvgAggregationFunction.java | 32 +++++++++----------
.../aggregation/MaxAggregationFunction.java | 20 +++++++-----
.../aggregation/MinAggregationFunction.java | 19 +++++++----
.../aggregation/SumAggregationFunction.java | 33 ++++++++++++--------
.../UnsupportedAggregationTypeException.java | 3 ++
8 files changed, 79 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/afb85f5f/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AggregationFunction.java
index 8117cfe..e59aca4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AggregationFunction.java
@@ -28,8 +28,8 @@ public abstract class AggregationFunction<T> implements java.io.Serializable {
private static final long serialVersionUID = 1L;
public abstract void initializeAggregate();
-
+
public abstract void aggregate(T value);
-
+
public abstract T getAggregate();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/afb85f5f/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AggregationFunctionFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AggregationFunctionFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AggregationFunctionFactory.java
index b1d4218..a4a3081 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AggregationFunctionFactory.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AggregationFunctionFactory.java
@@ -20,9 +20,12 @@ package org.apache.flink.api.java.aggregation;
import org.apache.flink.annotation.Internal;
+/**
+ * Interface of factory for creating {@link AggregationFunction}.
+ */
@Internal
public interface AggregationFunctionFactory extends java.io.Serializable {
<T> AggregationFunction<T> createAggregationFunction(Class<T> type);
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/afb85f5f/flink-java/src/main/java/org/apache/flink/api/java/aggregation/Aggregations.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/Aggregations.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/Aggregations.java
index 403a2b3..1cc186d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/Aggregations.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/Aggregations.java
@@ -20,17 +20,20 @@ package org.apache.flink.api.java.aggregation;
import org.apache.flink.annotation.Public;
+/**
+ * Shortcuts for Aggregation factories.
+ */
@Public
public enum Aggregations {
-
+
SUM (new SumAggregationFunction.SumAggregationFunctionFactory()),
MIN (new MinAggregationFunction.MinAggregationFunctionFactory()),
MAX (new MaxAggregationFunction.MaxAggregationFunctionFactory());
-
+
// --------------------------------------------------------------------------------------------
-
+
private final AggregationFunctionFactory factory;
-
+
private Aggregations(AggregationFunctionFactory factory) {
this.factory = factory;
}
@@ -38,5 +41,5 @@ public enum Aggregations {
public AggregationFunctionFactory getFactory() {
return this.factory;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/afb85f5f/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java
index b433d66..4760b92 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java
@@ -28,12 +28,12 @@ package org.apache.flink.api.java.aggregation;
// public String toString() {
// return "AVG";
// }
-//
+//
// // --------------------------------------------------------------------------------------------
-//
+//
// public static final class ByteAvgAgg extends AvgAggregationFunction<Byte> {
// private static final long serialVersionUID = 1L;
-//
+//
// private long sum;
// private long count;
//
@@ -54,10 +54,10 @@ package org.apache.flink.api.java.aggregation;
// return (byte) (sum / count);
// }
// }
-//
+//
// public static final class ShortAvgAgg extends AvgAggregationFunction<Short> {
// private static final long serialVersionUID = 1L;
-//
+//
// private long sum;
// private long count;
//
@@ -78,10 +78,10 @@ package org.apache.flink.api.java.aggregation;
// return (short) (sum / count);
// }
// }
-//
+//
// public static final class IntAvgAgg extends AvgAggregationFunction<Integer> {
// private static final long serialVersionUID = 1L;
-//
+//
// private long sum;
// private long count;
//
@@ -102,10 +102,10 @@ package org.apache.flink.api.java.aggregation;
// return (int) (sum / count);
// }
// }
-//
+//
// public static final class LongAvgAgg extends AvgAggregationFunction<Long> {
// private static final long serialVersionUID = 1L;
-//
+//
// private long sum;
// private long count;
//
@@ -126,10 +126,10 @@ package org.apache.flink.api.java.aggregation;
// return sum / count;
// }
// }
-//
+//
// public static final class FloatAvgAgg extends AvgAggregationFunction<Float> {
// private static final long serialVersionUID = 1L;
-//
+//
// private float sum;
// private long count;
//
@@ -150,10 +150,10 @@ package org.apache.flink.api.java.aggregation;
// return sum / count;
// }
// }
-//
+//
// public static final class DoubleAvgAgg extends AvgAggregationFunction<Double> {
// private static final long serialVersionUID = 1L;
-//
+//
// private double sum;
// private long count;
//
@@ -175,9 +175,9 @@ package org.apache.flink.api.java.aggregation;
// }
// }
//
-//
+//
// // --------------------------------------------------------------------------------------------
-//
+//
// public static final class AvgAggregationFunctionFactory implements AggregationFunctionFactory {
// private static final long serialVersionUID = 1L;
//
@@ -203,7 +203,7 @@ package org.apache.flink.api.java.aggregation;
// return (AggregationFunction<T>) new ShortAvgAgg();
// }
// else {
-// throw new UnsupportedAggregationTypeException("The type " + type.getName() +
+// throw new UnsupportedAggregationTypeException("The type " + type.getName() +
// " has currently not supported for built-in average aggregations.");
// }
// }
http://git-wip-us.apache.org/repos/asf/flink/blob/afb85f5f/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
index 69715c3..4657199 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
@@ -22,6 +22,9 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.ResettableValue;
+/**
+ * Implementation of {@link AggregationFunction} for max operation.
+ */
@Internal
public abstract class MaxAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> {
private static final long serialVersionUID = 1L;
@@ -33,7 +36,7 @@ public abstract class MaxAggregationFunction<T extends Comparable<T>> extends Ag
// --------------------------------------------------------------------------------------------
- public static final class ImmutableMaxAgg<U extends Comparable<U>> extends MaxAggregationFunction<U> {
+ private static final class ImmutableMaxAgg<U extends Comparable<U>> extends MaxAggregationFunction<U> {
private static final long serialVersionUID = 1L;
private U value;
@@ -58,10 +61,10 @@ public abstract class MaxAggregationFunction<T extends Comparable<T>> extends Ag
return value;
}
}
-
+
// --------------------------------------------------------------------------------------------
- public static final class MutableMaxAgg<U extends Comparable<U> & ResettableValue<U> & CopyableValue<U>> extends MaxAggregationFunction<U> {
+ private static final class MutableMaxAgg<U extends Comparable<U> & ResettableValue<U> & CopyableValue<U>> extends MaxAggregationFunction<U> {
private static final long serialVersionUID = 1L;
private U value;
@@ -88,12 +91,15 @@ public abstract class MaxAggregationFunction<T extends Comparable<T>> extends Ag
return value;
}
}
-
+
// --------------------------------------------------------------------------------------------
-
+
+ /**
+ * Factory for {@link MaxAggregationFunction}.
+ */
public static final class MaxAggregationFunctionFactory implements AggregationFunctionFactory {
private static final long serialVersionUID = 1L;
-
+
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) {
@@ -104,7 +110,7 @@ public abstract class MaxAggregationFunction<T extends Comparable<T>> extends Ag
return (AggregationFunction<T>) new ImmutableMaxAgg();
}
} else {
- throw new UnsupportedAggregationTypeException("The type " + type.getName() +
+ throw new UnsupportedAggregationTypeException("The type " + type.getName() +
" is not supported for maximum aggregation. " +
"Maximum aggregatable types must implement the Comparable interface.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/afb85f5f/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
index c7ebcc5..a5e9d31 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
@@ -22,6 +22,10 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.ResettableValue;
+/**
+ * Implementations of {@link AggregationFunction} for min operation.
+ * @param <T> aggregating type
+ */
@Internal
public abstract class MinAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> {
private static final long serialVersionUID = 1L;
@@ -33,7 +37,7 @@ public abstract class MinAggregationFunction<T extends Comparable<T>> extends Ag
// --------------------------------------------------------------------------------------------
- public static final class ImmutableMinAgg<U extends Comparable<U>> extends MinAggregationFunction<U> {
+ private static final class ImmutableMinAgg<U extends Comparable<U>> extends MinAggregationFunction<U> {
private static final long serialVersionUID = 1L;
private U value;
@@ -61,7 +65,7 @@ public abstract class MinAggregationFunction<T extends Comparable<T>> extends Ag
// --------------------------------------------------------------------------------------------
- public static final class MutableMinAgg<U extends Comparable<U> & ResettableValue<U> & CopyableValue<U>> extends MinAggregationFunction<U> {
+ private static final class MutableMinAgg<U extends Comparable<U> & ResettableValue<U> & CopyableValue<U>> extends MinAggregationFunction<U> {
private static final long serialVersionUID = 1L;
private U value;
@@ -88,12 +92,15 @@ public abstract class MinAggregationFunction<T extends Comparable<T>> extends Ag
return value;
}
}
-
+
// --------------------------------------------------------------------------------------------
-
+
+ /**
+ * Factory for {@link MinAggregationFunction}.
+ */
public static final class MinAggregationFunctionFactory implements AggregationFunctionFactory {
private static final long serialVersionUID = 1L;
-
+
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) {
@@ -104,7 +111,7 @@ public abstract class MinAggregationFunction<T extends Comparable<T>> extends Ag
return (AggregationFunction<T>) new ImmutableMinAgg();
}
} else {
- throw new UnsupportedAggregationTypeException("The type " + type.getName() +
+ throw new UnsupportedAggregationTypeException("The type " + type.getName() +
" is not supported for minimum aggregation. " +
"Minimum aggregatable types must implement the Comparable interface.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/afb85f5f/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
index 7fb1f7b..65c8f5d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
@@ -26,6 +26,10 @@ import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.ShortValue;
+/**
+ * Definitions of sum functions for different numerical types.
+ * @param <T> type of elements being summed
+ */
@Internal
public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
@@ -38,7 +42,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
// --------------------------------------------------------------------------------------------
- public static final class ByteSumAgg extends SumAggregationFunction<Byte> {
+ private static final class ByteSumAgg extends SumAggregationFunction<Byte> {
private static final long serialVersionUID = 1L;
private long agg;
@@ -59,7 +63,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
}
}
- public static final class ByteValueSumAgg extends SumAggregationFunction<ByteValue> {
+ private static final class ByteValueSumAgg extends SumAggregationFunction<ByteValue> {
private static final long serialVersionUID = 1L;
private long agg;
@@ -80,7 +84,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
}
}
- public static final class ShortSumAgg extends SumAggregationFunction<Short> {
+ private static final class ShortSumAgg extends SumAggregationFunction<Short> {
private static final long serialVersionUID = 1L;
private long agg;
@@ -101,7 +105,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
}
}
- public static final class ShortValueSumAgg extends SumAggregationFunction<ShortValue> {
+ private static final class ShortValueSumAgg extends SumAggregationFunction<ShortValue> {
private static final long serialVersionUID = 1L;
private long agg;
@@ -122,7 +126,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
}
}
- public static final class IntSumAgg extends SumAggregationFunction<Integer> {
+ private static final class IntSumAgg extends SumAggregationFunction<Integer> {
private static final long serialVersionUID = 1L;
private long agg;
@@ -143,7 +147,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
}
}
- public static final class IntValueSumAgg extends SumAggregationFunction<IntValue> {
+ private static final class IntValueSumAgg extends SumAggregationFunction<IntValue> {
private static final long serialVersionUID = 1L;
private long agg;
@@ -164,7 +168,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
}
}
- public static final class LongSumAgg extends SumAggregationFunction<Long> {
+ private static final class LongSumAgg extends SumAggregationFunction<Long> {
private static final long serialVersionUID = 1L;
private long agg;
@@ -185,7 +189,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
}
}
- public static final class LongValueSumAgg extends SumAggregationFunction<LongValue> {
+ private static final class LongValueSumAgg extends SumAggregationFunction<LongValue> {
private static final long serialVersionUID = 1L;
private long agg;
@@ -206,7 +210,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
}
}
- public static final class FloatSumAgg extends SumAggregationFunction<Float> {
+ private static final class FloatSumAgg extends SumAggregationFunction<Float> {
private static final long serialVersionUID = 1L;
private double agg;
@@ -227,7 +231,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
}
}
- public static final class FloatValueSumAgg extends SumAggregationFunction<FloatValue> {
+ private static final class FloatValueSumAgg extends SumAggregationFunction<FloatValue> {
private static final long serialVersionUID = 1L;
private double agg;
@@ -248,7 +252,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
}
}
- public static final class DoubleSumAgg extends SumAggregationFunction<Double> {
+ private static final class DoubleSumAgg extends SumAggregationFunction<Double> {
private static final long serialVersionUID = 1L;
private double agg;
@@ -269,7 +273,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
}
}
- public static final class DoubleValueSumAgg extends SumAggregationFunction<DoubleValue> {
+ private static final class DoubleValueSumAgg extends SumAggregationFunction<DoubleValue> {
private static final long serialVersionUID = 1L;
private double agg;
@@ -292,6 +296,9 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
// --------------------------------------------------------------------------------------------
+ /**
+ * Factory for {@link SumAggregationFunction}.
+ */
public static final class SumAggregationFunctionFactory implements AggregationFunctionFactory {
private static final long serialVersionUID = 1L;
@@ -335,7 +342,7 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
return (AggregationFunction<T>) new ShortValueSumAgg();
}
else {
- throw new UnsupportedAggregationTypeException("The type " + type.getName() +
+ throw new UnsupportedAggregationTypeException("The type " + type.getName() +
" is currently not supported for built-in sum aggregations.");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/afb85f5f/flink-java/src/main/java/org/apache/flink/api/java/aggregation/UnsupportedAggregationTypeException.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/UnsupportedAggregationTypeException.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/UnsupportedAggregationTypeException.java
index 694e500..23a099f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/UnsupportedAggregationTypeException.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/UnsupportedAggregationTypeException.java
@@ -20,6 +20,9 @@ package org.apache.flink.api.java.aggregation;
import org.apache.flink.annotation.PublicEvolving;
+/**
+ * Exception indicating an unsupported type was used for an aggregation.
+ */
@PublicEvolving
public class UnsupportedAggregationTypeException extends RuntimeException {