You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/09 20:08:48 UTC
[4/4] incubator-flink git commit: [streaming] Updated streaming
groupBy operators to allow grouping on field expressions
[streaming] Updated streaming groupBy operators to allow grouping on field expressions
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c9f3846f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c9f3846f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c9f3846f
Branch: refs/heads/master
Commit: c9f3846fa7fa673f23583db554a29a5ee309d1d7
Parents: e580497
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Nov 4 23:00:18 2014 +0100
Committer: mbalassi <ba...@gmail.com>
Committed: Sun Nov 9 13:16:46 2014 +0100
----------------------------------------------------------------------
.../api/datastream/BatchedDataStream.java | 28 ++++-
.../api/datastream/CoBatchedDataStream.java | 22 ++++
.../api/datastream/CoWindowDataStream.java | 26 +++++
.../api/datastream/ConnectedDataStream.java | 71 +++++++++++++
.../streaming/api/datastream/DataStream.java | 106 +++++++++++++++++--
.../api/datastream/WindowDataStream.java | 9 +-
.../api/function/co/JoinWindowFunction.java | 15 ++-
.../streaming/util/keys/ArrayKeySelector.java | 45 ++++++++
.../streaming/util/keys/FieldsKeySelector.java | 83 ++++-----------
.../streaming/util/keys/ObjectKeySelector.java | 30 ++++++
.../streaming/util/keys/PojoKeySelector.java | 98 +++++++++++++++++
.../streaming/util/keys/TupleKeySelector.java | 44 ++++++++
.../streaming/api/AggregationFunctionTest.java | 11 +-
.../operator/CoGroupedBatchReduceTest.java | 10 +-
.../invokable/operator/CoGroupedReduceTest.java | 8 +-
.../operator/CoGroupedWindowReduceTest.java | 10 +-
.../operator/GroupedBatchGroupReduceTest.java | 9 +-
.../operator/GroupedBatchReduceTest.java | 7 +-
.../operator/GroupedReduceInvokableTest.java | 4 +-
.../GroupedWindowGroupReduceInvokableTest.java | 4 +-
.../operator/WindowReduceInvokableTest.java | 5 +-
.../partitioner/FieldsPartitionerTest.java | 5 +-
.../streaming/util/FieldsKeySelectorTest.java | 41 ++-----
.../java/typeutils/runtime/PojoComparator.java | 2 +-
24 files changed, 543 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
index 75eadcf..86cb90b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -75,15 +75,33 @@ public class BatchedDataStream<OUT> {
/**
* Groups the elements of the {@link BatchedDataStream} by the given key
- * position to be used with grouped operators.
+ * positions to be used with grouped operators.
*
- * @param keyPosition
- * The position of the field on which the
+ * @param fields
+ * The position of the fields on which the
* {@link BatchedDataStream} will be grouped.
* @return The transformed {@link BatchedDataStream}
*/
- public BatchedDataStream<OUT> groupBy(int keyPosition) {
- return new BatchedDataStream<OUT>(dataStream.groupBy(keyPosition), batchSize, slideSize);
+ public BatchedDataStream<OUT> groupBy(int... fields) {
+ return new BatchedDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize);
+ }
+
+ /**
+ * Groups a {@link BatchedDataStream} using field expressions. A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link BatchedDataStream}S underlying type. A dot can
+ * be used to drill down into objects, as in
+ * {@code "field1.getInnerField2()" }.
+ *
+ * @param fields
+ * One or more field expressions on which the DataStream will be
+ * grouped.
+ * @return The grouped {@link BatchedDataStream}
+ **/
+ public BatchedDataStream<OUT> groupBy(String... fields) {
+
+ return new BatchedDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize);
+
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
index bfe679d..387c356 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
@@ -83,6 +84,27 @@ public class CoBatchedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2>
dataStream2.groupBy(keyPosition2), batchSize1, batchSize2, slideSize1, slideSize2);
}
+ public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
+ return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1),
+ dataStream2.groupBy(keyPositions2), batchSize1, batchSize2, slideSize1, slideSize2);
+ }
+
+ public ConnectedDataStream<IN1, IN2> groupBy(String field1, String field2) {
+ return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(field1),
+ dataStream2.groupBy(field2), batchSize1, batchSize2, slideSize1, slideSize2);
+ }
+
+ public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] fields2) {
+ return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(fields1),
+ dataStream2.groupBy(fields2), batchSize1, batchSize2, slideSize1, slideSize2);
+ }
+
+ public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
+ KeySelector<IN2, ?> keySelector2) {
+ return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(keySelector1),
+ dataStream2.groupBy(keySelector2), batchSize1, batchSize2, slideSize1, slideSize2);
+ }
+
@Override
protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
CoReduceFunction<IN1, IN2, OUT> coReducer) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
index 95165df..6e47873 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
@@ -65,6 +66,31 @@ public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2>
timeStamp1, timeStamp2);
}
+ public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
+ return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1),
+ dataStream2.groupBy(keyPositions2), batchSize1, batchSize2, slideSize1, slideSize2,
+ timeStamp1, timeStamp2);
+ }
+
+ public ConnectedDataStream<IN1, IN2> groupBy(String field1, String field2) {
+ return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(field1),
+ dataStream2.groupBy(field2), batchSize1, batchSize2, slideSize1, slideSize2,
+ timeStamp1, timeStamp2);
+ }
+
+ public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] fields2) {
+ return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(fields1),
+ dataStream2.groupBy(fields2), batchSize1, batchSize2, slideSize1, slideSize2,
+ timeStamp1, timeStamp2);
+ }
+
+ public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1,
+ KeySelector<IN2, ?> keySelector2) {
+ return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(keySelector1),
+ dataStream2.groupBy(keySelector2), batchSize1, batchSize2, slideSize1, slideSize2,
+ timeStamp1, timeStamp2);
+ }
+
@Override
protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
CoReduceFunction<IN1, IN2, OUT> coReducer) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 33dbc7e..23c420c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -154,6 +154,60 @@ public class ConnectedDataStream<IN1, IN2> {
/**
* GroupBy operation for connected data stream. Groups the elements of
+ * input1 and input2 according to keyPositions1 and keyPositions2. Used for
+ * applying function on grouped data streams for example
+ * {@link ConnectedDataStream#reduce}
+ *
+ * @param keyPositions1
+ * The fields used to group the first input stream.
+ * @param keyPositions2
+ * The fields used to group the second input stream.
+ * @return @return The transformed {@link ConnectedDataStream}
+ */
+ public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) {
+ return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1),
+ dataStream2.groupBy(keyPositions2));
+ }
+
+ /**
+ * GroupBy operation for connected data stream using key expressions. Groups
+ * the elements of input1 and input2 according to field1 and field2. A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream}S underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field1
+ * The grouping expression for the first input
+ * @param field2
+ * The grouping expression for the second input
+ * @return The grouped {@link ConnectedDataStream}
+ */
+ public ConnectedDataStream<IN1, IN2> groupBy(String field1, String field2) {
+ return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(field1),
+ dataStream2.groupBy(field2));
+ }
+
+ /**
+ * GroupBy operation for connected data stream using key expressions. Groups
+ * the elements of input1 and input2 according to fields1 and fields2. A
+ * field expression is either the name of a public field or a getter method
+ * with parentheses of the {@link DataStream}S underlying type. A dot can be
+ * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+ * .
+ *
+ * @param fields1
+ * The grouping expressions for the first input
+ * @param fields2
+ * The grouping expressions for the second input
+ * @return The grouped {@link ConnectedDataStream}
+ */
+ public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] fields2) {
+ return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(fields1),
+ dataStream2.groupBy(fields2));
+ }
+
+ /**
+ * GroupBy operation for connected data stream. Groups the elements of
* input1 and input2 using keySelector1 and keySelector2. Used for applying
* function on grouped data streams for example
* {@link ConnectedDataStream#reduce}
@@ -510,6 +564,13 @@ public class ConnectedDataStream<IN1, IN2> {
}
SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
+ String fieldIn1, String fieldIn2) {
+
+ return windowJoin(windowSize, slideInterval, new DefaultTimeStamp<IN1>(),
+ new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+ }
+
+ SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
int fieldIn1, int fieldIn2) {
return windowJoin(windowSize, slideInterval, new DefaultTimeStamp<IN1>(),
@@ -526,6 +587,16 @@ public class ConnectedDataStream<IN1, IN2> {
timestamp2);
}
+ SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowJoin(long windowSize, long slideInterval,
+ TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2, String fieldIn1, String fieldIn2) {
+
+ JoinWindowFunction<IN1, IN2> joinWindowFunction = new JoinWindowFunction<IN1, IN2>(
+ dataStream1.getOutputType(), dataStream2.getOutputType(), fieldIn1, fieldIn2);
+
+ return addGeneralWindowJoin(joinWindowFunction, windowSize, slideInterval, timestamp1,
+ timestamp2);
+ }
+
private SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> addGeneralWindowJoin(
CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> coWindowFunction, long windowSize,
long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c777003..b50c42d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -69,6 +69,7 @@ import org.apache.flink.streaming.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.PojoKeySelector;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeWrapper;
@@ -373,6 +374,30 @@ public class DataStream<OUT> {
* @param slideInterval
* After every function call the windows will be slid by this
* interval.
+ * @param fieldIn1
+ * The field in the first stream to be matched.
+ * @param fieldIn2
+ * The field in the second stream to be matched.
+ * @return The transformed {@link DataStream}.
+ */
+ public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+ DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval, String fieldIn1,
+ String fieldIn2) {
+ return this.windowJoin(dataStreamToJoin, windowSize, slideInterval,
+ new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>(), fieldIn1, fieldIn2);
+ }
+
+ /**
+ * Creates a join of a data stream based on the given positions.
+ *
+ * @param dataStreamToJoin
+ * {@link DataStream} to join with.
+ * @param windowSize
+ * Size of the windows that will be aligned for both streams in
+ * milliseconds.
+ * @param slideInterval
+ * After every function call the windows will be slid by this
+ * interval.
* @param timestamp1
* User defined time stamps for the first input.
* @param timestamp2
@@ -391,8 +416,36 @@ public class DataStream<OUT> {
}
/**
- * Sets the partitioning of the {@link DataStream} so that the output tuples
- * are partitioned by the hashcodes of the selected fields.
+ * Creates a join of a data stream based on the given positions.
+ *
+ * @param dataStreamToJoin
+ * {@link DataStream} to join with.
+ * @param windowSize
+ * Size of the windows that will be aligned for both streams in
+ * milliseconds.
+ * @param slideInterval
+ * After every function call the windows will be slid by this
+ * interval.
+ * @param timestamp1
+ * User defined time stamps for the first input.
+ * @param timestamp2
+ * User defined time stamps for the second input.
+ * @param fieldIn1
+ * The field in the first stream to be matched.
+ * @param fieldIn2
+ * The field in the second stream to be matched.
+ * @return The transformed {@link DataStream}.
+ */
+ public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowJoin(
+ DataStream<IN2> dataStreamToJoin, long windowSize, long slideInterval,
+ TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2, String fieldIn1, String fieldIn2) {
+ return this.connect(dataStreamToJoin).windowJoin(windowSize, slideInterval, timestamp1,
+ timestamp2, fieldIn1, fieldIn2);
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output is
+ * partitioned by the selected fields.
*
* @param fields
* The fields to partition by.
@@ -400,10 +453,31 @@ public class DataStream<OUT> {
*/
public DataStream<OUT> partitionBy(int... fields) {
- return setConnectionType(new FieldsPartitioner<OUT>(new FieldsKeySelector<OUT>(
+ return setConnectionType(new FieldsPartitioner<OUT>(FieldsKeySelector.getSelector(
getOutputType(), fields)));
}
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output is
+ * partitioned by the given field expressions.
+ *
+ * @param fields
+ * The fields expressions to partition by.
+ * @return The DataStream with fields partitioning set.
+ */
+ public DataStream<OUT> partitionBy(String... fields) {
+
+ return setConnectionType(new FieldsPartitioner<OUT>(new PojoKeySelector<OUT>(
+ getOutputType(), fields)));
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output is
+ * partitioned using the given {@link KeySelector}.
+ *
+ * @param keySelector
+ * @return
+ */
public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
return setConnectionType(new FieldsPartitioner<OUT>(keySelector));
}
@@ -547,10 +621,30 @@ public class DataStream<OUT> {
* @param fields
* The position of the fields on which the {@link DataStream}
* will be grouped.
- * @return The transformed {@link DataStream}
+ * @return The grouped {@link DataStream}
*/
public GroupedDataStream<OUT> groupBy(int... fields) {
- return groupBy(new FieldsKeySelector<OUT>(getOutputType(), fields));
+
+ return groupBy(FieldsKeySelector.getSelector(getOutputType(), fields));
+
+ }
+
+ /**
+ * Groups a {@link DataStream} using field expressions. A field expression
+ * is either the name of a public field or a getter method with parentheses
+ * of the {@link DataStream}S underlying type. A dot can be used to drill
+ * down into objects, as in {@code "field1.getInnerField2()" }. This method
+ * returns an {@link GroupedDataStream}.
+ *
+ * @param fields
+ * One or more field expressions on which the DataStream will be
+ * grouped.
+ * @return The grouped {@link DataStream}
+ **/
+ public GroupedDataStream<OUT> groupBy(String... fields) {
+
+ return groupBy(new PojoKeySelector<OUT>(getOutputType(), fields));
+
}
/**
@@ -561,7 +655,7 @@ public class DataStream<OUT> {
* @param keySelector
* The {@link KeySelector} that will be used to extract keys for
* the values
- * @return The transformed {@link DataStream}
+ * @return The grouped {@link DataStream}
*/
public GroupedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
return new GroupedDataStream<OUT>(this, keySelector);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
index d523b9a..be0e37f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java
@@ -51,8 +51,13 @@ public class WindowDataStream<OUT> extends BatchedDataStream<OUT> {
this.timeStamp = windowDataStream.timeStamp;
}
- public WindowDataStream<OUT> groupBy(int keyPosition) {
- return new WindowDataStream<OUT>(dataStream.groupBy(keyPosition), batchSize, slideSize,
+ public WindowDataStream<OUT> groupBy(int... keyPositions) {
+ return new WindowDataStream<OUT>(dataStream.groupBy(keyPositions), batchSize, slideSize,
+ timeStamp);
+ }
+
+ public WindowDataStream<OUT> groupBy(String... fields) {
+ return new WindowDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize,
timeStamp);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
index 3dea04d..cb501c6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
@@ -24,21 +24,28 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.PojoKeySelector;
import org.apache.flink.util.Collector;
public class JoinWindowFunction<IN1, IN2> implements CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> {
private static final long serialVersionUID = 1L;
- private KeySelector<IN1, Object> keySelector1;
- private KeySelector<IN2, Object> keySelector2;
+ private KeySelector<IN1, ?> keySelector1;
+ private KeySelector<IN2, ?> keySelector2;
public JoinWindowFunction() {
}
public JoinWindowFunction(TypeInformation<IN1> inType1, TypeInformation<IN2> inType2,
int positionIn1, int positionIn2) {
- keySelector1 = new FieldsKeySelector<IN1>(inType1, positionIn1);
- keySelector2 = new FieldsKeySelector<IN2>(inType2, positionIn2);
+ keySelector1 = FieldsKeySelector.getSelector(inType1, positionIn1);
+ keySelector2 = FieldsKeySelector.getSelector(inType2, positionIn2);
+ }
+
+ public JoinWindowFunction(TypeInformation<IN1> inType1, TypeInformation<IN2> inType2,
+ String field1, String field2) {
+ keySelector1 = new PojoKeySelector<IN1>(inType1, field1);
+ keySelector2 = new PojoKeySelector<IN2>(inType2, field2);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ArrayKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ArrayKeySelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ArrayKeySelector.java
new file mode 100644
index 0000000..6d6f620
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ArrayKeySelector.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.keys;
+
+import java.lang.reflect.Array;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+public class ArrayKeySelector<IN> extends FieldsKeySelector<IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ public ArrayKeySelector(int... fields) {
+ super(fields);
+ }
+
+ @Override
+ public Object getKey(IN value) throws Exception {
+ if (simpleKey) {
+ return Array.get(value, keyFields[0]);
+ } else {
+ int c = 0;
+ for (int pos : keyFields) {
+ ((Tuple) key).setField(Array.get(value, pos), c);
+ c++;
+ }
+ return key;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
index 664d545..d785109 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
@@ -17,8 +17,6 @@
package org.apache.flink.streaming.util.keys;
-import java.lang.reflect.Array;
-
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -50,15 +48,13 @@ import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.api.java.tuple.Tuple9;
-public class FieldsKeySelector<IN> implements KeySelector<IN, Object> {
+public abstract class FieldsKeySelector<IN> implements KeySelector<IN, Object> {
private static final long serialVersionUID = 1L;
- int[] keyFields;
- boolean isTuple;
- boolean isArray;
- int numberOfKeys;
- Object key;
+ protected int[] keyFields;
+ protected Object key;
+ protected boolean simpleKey;
public static Class<?>[] tupleClasses = new Class[] { Tuple1.class, Tuple2.class, Tuple3.class,
Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class,
@@ -67,73 +63,40 @@ public class FieldsKeySelector<IN> implements KeySelector<IN, Object> {
Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class,
Tuple25.class };
- public FieldsKeySelector(boolean isTuple, boolean isArray, int... fields) {
+ public FieldsKeySelector(int... fields) {
this.keyFields = fields;
- this.numberOfKeys = fields.length;
- this.isTuple = isTuple;
- this.isArray = isArray;
-
+ this.simpleKey = fields.length == 1;
for (int i : fields) {
if (i < 0) {
throw new RuntimeException("Grouping fields must be non-negative");
}
}
- if (numberOfKeys > 1) {
- if (!this.isTuple && !this.isArray) {
- throw new RuntimeException(
- "For non-tuple types use single field 0 or KeyExctractor for grouping");
- } else {
- try {
- key = tupleClasses[fields.length - 1].newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage());
- }
- }
- } else {
- if (!this.isTuple && !this.isArray) {
- if (fields[0] > 0) {
- throw new RuntimeException(
- "For simple objects grouping only allowed on the first field");
- }
- }
- key = null;
+ try {
+ key = (Tuple) tupleClasses[fields.length - 1].newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage());
}
- }
- public FieldsKeySelector(TypeInformation<IN> type, int... fields) {
- this(type.isTupleType(),
- (type instanceof BasicArrayTypeInfo || type instanceof PrimitiveArrayTypeInfo),
- fields);
}
- @Override
- public Object getKey(IN value) throws Exception {
- if (numberOfKeys > 1) {
- int c = 0;
- if (isTuple) {
- for (int pos : keyFields) {
- ((Tuple) key).setField(((Tuple) value).getField(pos), c);
- c++;
- }
- } else {
- // if array type
- for (int pos : keyFields) {
- ((Tuple) key).setField(Array.get(value, pos), c);
- c++;
- }
-
- }
+ public static <R> KeySelector<R, ?> getSelector(TypeInformation<R> type, int... fields) {
+ if (type.isTupleType()) {
+ return new TupleKeySelector<R>(fields);
+ } else if (type instanceof BasicArrayTypeInfo || type instanceof PrimitiveArrayTypeInfo) {
+ return new ArrayKeySelector<R>(fields);
} else {
- if (isTuple) {
- key = ((Tuple) value).getField(keyFields[0]);
- } else if (isArray) {
- key = Array.get(value, keyFields[0]);
+ if (fields.length > 1) {
+ throw new RuntimeException(
+ "For non-tuple types use single field 0 or KeyExctractor for grouping");
+
+ } else if (fields[0] > 0) {
+ throw new RuntimeException(
+ "For simple objects grouping only allowed on the first field");
} else {
- key = value;
+ return new ObjectKeySelector<R>();
}
}
- return key;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ObjectKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ObjectKeySelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ObjectKeySelector.java
new file mode 100644
index 0000000..cf4ecc6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/ObjectKeySelector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.keys;
+
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class ObjectKeySelector<IN> implements KeySelector<IN, IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IN getKey(IN value) throws Exception {
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/PojoKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/PojoKeySelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/PojoKeySelector.java
new file mode 100644
index 0000000..618f14f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/PojoKeySelector.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.keys;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
+
+public class PojoKeySelector<IN> extends FieldsKeySelector<IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ PojoComparator<IN> comparator;
+
+ public PojoKeySelector(TypeInformation<IN> type, String... fields) {
+ super(new int[removeDuplicates(fields).length]);
+ if (!(type instanceof CompositeType<?>)) {
+ throw new IllegalArgumentException(
+ "Key expressions are only supported on POJO types and Tuples. "
+ + "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+ }
+ CompositeType<IN> cType = (CompositeType<IN>) type;
+
+ String[] keyFields = removeDuplicates(fields);
+ int numOfKeys = keyFields.length;
+
+ List<FlatFieldDescriptor> fieldDescriptors = new ArrayList<FlatFieldDescriptor>();
+ for (String field : keyFields) {
+ cType.getKey(field, 0, fieldDescriptors);
+ }
+
+ int[] logicalKeyPositions = new int[numOfKeys];
+ boolean[] orders = new boolean[numOfKeys];
+
+ for (int i = 0; i < numOfKeys; i++) {
+ logicalKeyPositions[i] = fieldDescriptors.get(i).getPosition();
+ }
+
+ if (cType instanceof PojoTypeInfo) {
+ comparator = (PojoComparator<IN>) cType
+ .createComparator(logicalKeyPositions, orders, 0);
+ } else {
+ throw new IllegalArgumentException(
+ "Key expressions are only supported on POJO types. "
+ + "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+ }
+
+ }
+
+ @Override
+ public Object getKey(IN value) throws Exception {
+
+ Field[] keyFields = comparator.getKeyFields();
+ if (simpleKey) {
+ return comparator.accessField(keyFields[0], value);
+ } else {
+ int c = 0;
+ for (Field field : keyFields) {
+ ((Tuple) key).setField(comparator.accessField(field, value), c);
+ c++;
+ }
+ }
+ return key;
+ }
+
+ private static String[] removeDuplicates(String[] in) {
+ List<String> ret = new LinkedList<String>();
+ for (String el : in) {
+ if (!ret.contains(el)) {
+ ret.add(el);
+ }
+ }
+ return ret.toArray(new String[ret.size()]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/TupleKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/TupleKeySelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/TupleKeySelector.java
new file mode 100644
index 0000000..4ca64ef
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/TupleKeySelector.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.keys;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+public class TupleKeySelector<IN> extends FieldsKeySelector<IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ public TupleKeySelector(int... fields) {
+ super(fields);
+ }
+
+ @Override
+ public Object getKey(IN value) throws Exception {
+ if (simpleKey) {
+ return ((Tuple) value).getField(keyFields[0]);
+ } else {
+ int c = 0;
+ for (int pos : keyFields) {
+ ((Tuple) key).setField(((Tuple) value).getField(pos), c);
+ c++;
+ }
+ return key;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 70e6118..95c6f71 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunctio
import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class AggregationFunctionTest {
@@ -114,18 +114,15 @@ public class AggregationFunctionTest {
List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute(
new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction,
- new FieldsKeySelector<Tuple2<Integer, Integer>>(true, false, 0)),
- getInputList());
+ new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute(
new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction,
- new FieldsKeySelector<Tuple2<Integer, Integer>>(true, false, 0)),
- getInputList());
+ new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute(
new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction,
- new FieldsKeySelector<Tuple2<Integer, Integer>>(true, false, 0)),
- getInputList());
+ new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
assertEquals(expectedSumList, sumList);
assertEquals(expectedMinList, minList);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
index 229053b..ce01a7d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
import org.apache.flink.streaming.util.MockCoInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class CoGroupedBatchReduceTest {
@@ -96,8 +96,8 @@ public class CoGroupedBatchReduceTest {
expected.add("h");
CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
- new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new FieldsKeySelector(true, false, 0),
- new FieldsKeySelector(true, false, 0));
+ new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new TupleKeySelector(0),
+ new TupleKeySelector(0));
List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
@@ -143,8 +143,8 @@ public class CoGroupedBatchReduceTest {
expected.add("fh");
CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
- new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new FieldsKeySelector(true, false, 0),
- new FieldsKeySelector(true, false, 0));
+ new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new TupleKeySelector(0),
+ new TupleKeySelector(0));
List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
index e3557c5..4570e23 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
import org.apache.flink.streaming.util.MockCoInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class CoGroupedReduceTest {
@@ -72,8 +72,7 @@ public class CoGroupedReduceTest {
Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
- new MyCoReduceFunction(), new FieldsKeySelector(true, false, 0),
- new FieldsKeySelector(true, false, 0));
+ new MyCoReduceFunction(), new TupleKeySelector(0), new TupleKeySelector(0));
List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
"7");
@@ -84,8 +83,7 @@ public class CoGroupedReduceTest {
assertEquals(expected, actualList);
invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
- new MyCoReduceFunction(), new FieldsKeySelector(true, false, 2),
- new FieldsKeySelector(true, false, 0));
+ new MyCoReduceFunction(), new TupleKeySelector(2), new TupleKeySelector(0));
expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
index 1469b32..f36a7b5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockCoInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class CoGroupedWindowReduceTest {
@@ -125,8 +125,8 @@ public class CoGroupedWindowReduceTest {
expected.add("i");
CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
- new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new FieldsKeySelector(true, false, 0),
- new FieldsKeySelector(true, false, 0), new MyTimeStamp<Tuple2<String, Integer>>(
+ new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new TupleKeySelector(0),
+ new TupleKeySelector( 0), new MyTimeStamp<Tuple2<String, Integer>>(
timestamps1), new MyTimeStamp<Tuple2<String, String>>(timestamps2));
List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
@@ -178,8 +178,8 @@ public class CoGroupedWindowReduceTest {
expected.add("fh");
CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>(
- new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new FieldsKeySelector(true, false, 0),
- new FieldsKeySelector(true, false, 0), new MyTimeStamp<Tuple2<String, Integer>>(
+ new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new TupleKeySelector( 0),
+ new TupleKeySelector( 0), new MyTimeStamp<Tuple2<String, Integer>>(
timestamps1), new MyTimeStamp<Tuple2<String, String>>(timestamps2));
List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
index 4c70a5c..db2b8cf 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
@@ -27,7 +27,8 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.ObjectKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -72,8 +73,9 @@ public class GroupedBatchGroupReduceTest {
@SuppressWarnings("unchecked")
@Test
public void slidingBatchGroupReduceTest() {
+ @SuppressWarnings("rawtypes")
GroupedBatchGroupReduceInvokable<Integer, String> invokable1 = new GroupedBatchGroupReduceInvokable<Integer, String>(
- new MySlidingBatchReduce1(), 2, 2, new FieldsKeySelector<Integer>(false, false, 0));
+ new MySlidingBatchReduce1(), 2, 2, new ObjectKeySelector());
List<String> expected = Arrays.asList("1", "1", END_OF_GROUP, "3", "3", END_OF_GROUP, "2",
END_OF_GROUP);
@@ -83,8 +85,7 @@ public class GroupedBatchGroupReduceTest {
assertEquals(expected, actual);
GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String> invokable2 = new GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String>(
- new MySlidingBatchReduce2(), 2, 2, new FieldsKeySelector<Tuple2<Integer, String>>(
- true, false, 1));
+ new MySlidingBatchReduce2(), 2, 2, new TupleKeySelector<Tuple2<Integer, String>>(1));
expected = Arrays.asList("open", "1", "2", END_OF_GROUP, "open", "3", "3", END_OF_GROUP,
"open", "4", END_OF_GROUP);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
index 4b1a235..783119c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
@@ -26,7 +26,8 @@ import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.ObjectKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class GroupedBatchReduceTest {
@@ -60,7 +61,7 @@ public class GroupedBatchReduceTest {
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
- }, 3, 2, new FieldsKeySelector<Integer>(false, false, 0));
+ }, 3, 2, new ObjectKeySelector<Integer>());
List<Integer> actual = MockInvokable.createAndExecute(invokable, inputs);
assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(actual));
@@ -94,7 +95,7 @@ public class GroupedBatchReduceTest {
return value2;
}
}
- }, 3, 3, new FieldsKeySelector<Tuple2<Integer, String>>(true, false, 1));
+ }, 3, 3, new TupleKeySelector<Tuple2<Integer, String>>(1));
List<Tuple2<Integer, String>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
index 6aadc43..0b68207 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.ObjectKeySelector;
import org.junit.Test;
public class GroupedReduceInvokableTest {
@@ -43,7 +43,7 @@ public class GroupedReduceInvokableTest {
@Test
public void test() {
GroupedReduceInvokable<Integer> invokable1 = new GroupedReduceInvokable<Integer>(
- new MyReducer(), new FieldsKeySelector<Integer>(false, false, 0));
+ new MyReducer(), new ObjectKeySelector<Integer>());
List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3);
List<Integer> actual = MockInvokable.createAndExecute(invokable1,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
index 6054d3b..e93647f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -73,7 +73,7 @@ public class GroupedWindowGroupReduceInvokableTest {
}
out.collect(outTuple);
}
- }, 2, 3, new FieldsKeySelector(true, false, 0),
+ }, 2, 3, new TupleKeySelector( 0),
new TimeStamp<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
index ed1ed8c..5d10eff 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class WindowReduceInvokableTest {
@@ -106,8 +106,7 @@ public class WindowReduceInvokableTest {
Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
}
- }, 2, 3, new FieldsKeySelector(true, false, 0),
- new TimeStamp<Tuple2<String, Integer>>() {
+ }, 2, 3, new TupleKeySelector(0), new TimeStamp<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
index 347debc..18b3015 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Before;
import org.junit.Test;
@@ -42,8 +42,7 @@ public class FieldsPartitionerTest {
@Before
public void setPartitioner() {
- fieldsPartitioner = new FieldsPartitioner<Tuple>(new FieldsKeySelector<Tuple>(true, false,
- 0));
+ fieldsPartitioner = new FieldsPartitioner<Tuple>(new TupleKeySelector<Tuple>(0));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java
index 8987921..98b60b5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldsKeySelectorTest.java
@@ -18,17 +18,17 @@
package org.apache.flink.streaming.util;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.keys.FieldsKeySelector;
+import org.apache.flink.streaming.util.keys.ObjectKeySelector;
+import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class FieldsKeySelectorTest {
- @SuppressWarnings("unused")
@Test
public void testGetKey() throws Exception {
@@ -36,48 +36,23 @@ public class FieldsKeySelectorTest {
Tuple2<Integer, String> t = new Tuple2<Integer, String>(-1, "a");
double[] a = new double[] { 0.0, 1.2 };
- KeySelector<Integer, ?> ks1 = new FieldsKeySelector<Integer>(TypeExtractor.getForObject(i),
- 0);
-
- try {
- KeySelector<Integer, ?> ks2 = new FieldsKeySelector<Integer>(
- TypeExtractor.getForObject(i), 2, 1);
- fail();
- } catch (RuntimeException e) {
-
- }
-
- try {
- KeySelector<Integer, ?> ks2 = new FieldsKeySelector<Integer>(
- TypeExtractor.getForObject(i), -1);
- fail();
- } catch (RuntimeException e) {
-
- }
+ KeySelector<Integer, ?> ks1 = new ObjectKeySelector<Integer>();
assertEquals(ks1.getKey(i), 5);
- KeySelector<Tuple2<Integer, String>, ?> ks3 = new FieldsKeySelector<Tuple2<Integer, String>>(
- TypeExtractor.getForObject(t), 1);
+ KeySelector<Tuple2<Integer, String>, ?> ks3 = new TupleKeySelector<Tuple2<Integer, String>>(
+ 1);
assertEquals(ks3.getKey(t), "a");
- try {
- KeySelector<Tuple2<Integer, String>, ?> ks2 = new FieldsKeySelector<Tuple2<Integer, String>>(
- TypeExtractor.getForObject(t), 1, -1);
- fail();
- } catch (RuntimeException e) {
-
- }
-
- KeySelector<Tuple2<Integer, String>, ?> ks4 = new FieldsKeySelector<Tuple2<Integer, String>>(
+ KeySelector<Tuple2<Integer, String>, ?> ks4 = FieldsKeySelector.getSelector(
TypeExtractor.getForObject(t), 1, 1);
assertEquals(ks4.getKey(t), new Tuple2<String, String>("a", "a"));
- KeySelector<double[], ?> ks5 = new FieldsKeySelector<double[]>(
+ KeySelector<double[], ?> ks5 = FieldsKeySelector.getSelector(
TypeExtractor.getForObject(a), 0);
assertEquals(ks5.getKey(a), 0.0);
- KeySelector<double[], ?> ks6 = new FieldsKeySelector<double[]>(
+ KeySelector<double[], ?> ks6 = FieldsKeySelector.getSelector(
TypeExtractor.getForObject(a), 1, 0);
assertEquals(ks6.getKey(a), new Tuple2<Double, Double>(1.2, 0.0));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9f3846f/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
index 7c15ecd..ae4a806 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -192,7 +192,7 @@ public final class PojoComparator<T> extends CompositeTypeComparator<T> implemen
/**
* This method is handling the IllegalAccess exceptions of Field.get()
*/
- private final Object accessField(Field field, Object object) {
+ public final Object accessField(Field field, Object object) {
try {
object = field.get(object);
} catch (NullPointerException npex) {