You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/04/03 21:33:11 UTC
[2/5] flink git commit: [FLINK-1776] Add offsets to field indexes of
semantic properties for operators with key selectors
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
index d686633..60754e6 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
@@ -21,6 +21,12 @@ package org.apache.flink.api.java.operator;
import java.util.ArrayList;
import java.util.List;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.operators.CoGroupOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -33,6 +39,8 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operator.JoinOperatorTest.CustomType;
+import static org.junit.Assert.assertTrue;
+
@SuppressWarnings("serial")
public class CoGroupOperatorTest {
@@ -348,4 +356,128 @@ public class CoGroupOperatorTest {
}
);
}
+
+ @Test
+ public void testSemanticPropsWithKeySelector1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ CoGroupOperator<?,?,?> coGroupOp = tupleDs1.coGroup(tupleDs2)
+ .where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+ .with(new DummyTestCoGroupFunction1());
+
+ SemanticProperties semProps = coGroupOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0,2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0,3).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0,3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0,3).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0,4).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,6).size() == 0);
+
+ assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(1,4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(1,6).contains(0));
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(2));
+ assertTrue(semProps.getReadFields(0).contains(4));
+ assertTrue(semProps.getReadFields(0).contains(6));
+
+ assertTrue(semProps.getReadFields(1).size() == 2);
+ assertTrue(semProps.getReadFields(1).contains(3));
+ assertTrue(semProps.getReadFields(1).contains(5));
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ CoGroupOperator<?,?,?> coGroupOp = tupleDs1.coGroup(tupleDs2)
+ .where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+ .with(new DummyTestCoGroupFunction2())
+ .withForwardedFieldsFirst("2;4->0")
+ .withForwardedFieldsSecond("0->4;1;1->3");
+
+ SemanticProperties semProps = coGroupOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0,0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0,4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0,6).contains(0));
+
+ assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(1,2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(1,3).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(1,3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(1,3).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(1,4).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,6).size() == 0);
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(2));
+ assertTrue(semProps.getReadFields(0).contains(3));
+ assertTrue(semProps.getReadFields(0).contains(4));
+
+ assertTrue(semProps.getReadFields(1) == null);
+ }
+
+ public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+ @Override
+ public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+ return new Tuple2<Long, Integer>();
+ }
+ }
+
+ @FunctionAnnotation.ForwardedFieldsFirst("0->4;1;1->3")
+ @FunctionAnnotation.ForwardedFieldsSecond("2;4->0")
+ @FunctionAnnotation.ReadFieldsFirst("0;2;4")
+ @FunctionAnnotation.ReadFieldsSecond("1;3")
+ public static class DummyTestCoGroupFunction1
+ implements CoGroupFunction<Tuple5<Integer, Long, String, Long, Integer>,
+ Tuple5<Integer, Long, String, Long, Integer>,
+ Tuple5<Integer, Long, String, Long, Integer>> {
+
+ @Override
+ public void coGroup(Iterable<Tuple5<Integer, Long, String, Long, Integer>> first,
+ Iterable<Tuple5<Integer, Long, String, Long, Integer>> second,
+ Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+ }
+ }
+
+ @FunctionAnnotation.ReadFieldsFirst("0;1;2")
+ public static class DummyTestCoGroupFunction2
+ implements CoGroupFunction<Tuple5<Integer, Long, String, Long, Integer>,
+ Tuple5<Integer, Long, String, Long, Integer>,
+ Tuple5<Integer, Long, String, Long, Integer>> {
+
+ @Override
+ public void coGroup(Iterable<Tuple5<Integer, Long, String, Long, Integer>> first,
+ Iterable<Tuple5<Integer, Long, String, Long, Integer>> second,
+ Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
new file mode 100644
index 0000000..4870d29
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupCombineOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("serial")
+public class GroupCombineOperatorTest {
+
+ private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+ new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+ TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ @Test
+ public void testSemanticPropsWithKeySelector1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .combineGroup(new DummyGroupCombineFunction1());
+
+ SemanticProperties semProps = combineOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(2));
+ assertTrue(semProps.getReadFields(0).contains(5));
+ assertTrue(semProps.getReadFields(0).contains(6));
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+ .combineGroup(new DummyGroupCombineFunction1());
+
+ SemanticProperties semProps = combineOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(4));
+ assertTrue(semProps.getReadFields(0).contains(7));
+ assertTrue(semProps.getReadFields(0).contains(8));
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .combineGroup(new DummyGroupCombineFunction2())
+ .withForwardedFields("0->4;1;1->3;2");
+
+ SemanticProperties semProps = combineOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(2));
+ assertTrue(semProps.getReadFields(0).contains(5));
+ assertTrue(semProps.getReadFields(0).contains(6));
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector4() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+ .combineGroup(new DummyGroupCombineFunction2())
+ .withForwardedFields("0->4;1;1->3;2");
+
+ SemanticProperties semProps = combineOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(4));
+ assertTrue(semProps.getReadFields(0).contains(7));
+ assertTrue(semProps.getReadFields(0).contains(8));
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector5() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .combineGroup(new DummyGroupCombineFunction3())
+ .withForwardedFields("4->0;3;3->1;2");
+
+ SemanticProperties semProps = combineOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+ assertTrue(semProps.getReadFields(0) == null);
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector6() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+ .combineGroup(new DummyGroupCombineFunction3())
+ .withForwardedFields("4->0;3;3->1;2");
+
+ SemanticProperties semProps = combineOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 7).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 7).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 8).contains(0));
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) == 8);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 7);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 7);
+ assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+ assertTrue(semProps.getReadFields(0) == null);
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector7() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .combineGroup(new DummyGroupCombineFunction4());
+
+ SemanticProperties semProps = combineOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).contains(0));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) == 2);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 2) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+ assertTrue(semProps.getReadFields(0) == null);
+ }
+
+ public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+ @Override
+ public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+ return new Tuple2<Long, Integer>();
+ }
+ }
+
+ @FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
+ @FunctionAnnotation.ReadFields("0;3;4")
+ public static class DummyGroupCombineFunction1 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+ }
+ }
+
+ @FunctionAnnotation.ReadFields("0;3;4")
+ public static class DummyGroupCombineFunction2 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+ }
+ }
+
+ public static class DummyGroupCombineFunction3 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+ }
+ }
+
+ @FunctionAnnotation.NonForwardedFields("2;4")
+ public static class DummyGroupCombineFunction4 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
new file mode 100644
index 0000000..0bfe566
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("serial")
+public class GroupReduceOperatorTest {
+
+ private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+ new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+ TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ @Test
+ public void testSemanticPropsWithKeySelector1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .reduceGroup(new DummyGroupReduceFunction1());
+
+ SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(2));
+ assertTrue(semProps.getReadFields(0).contains(5));
+ assertTrue(semProps.getReadFields(0).contains(6));
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+ .reduceGroup(new DummyGroupReduceFunction1());
+
+ SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(4));
+ assertTrue(semProps.getReadFields(0).contains(7));
+ assertTrue(semProps.getReadFields(0).contains(8));
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .reduceGroup(new DummyGroupReduceFunction2())
+ .withForwardedFields("0->4;1;1->3;2");
+
+ SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(2));
+ assertTrue(semProps.getReadFields(0).contains(5));
+ assertTrue(semProps.getReadFields(0).contains(6));
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector4() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+ .reduceGroup(new DummyGroupReduceFunction2())
+ .withForwardedFields("0->4;1;1->3;2");
+
+ SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(4));
+ assertTrue(semProps.getReadFields(0).contains(7));
+ assertTrue(semProps.getReadFields(0).contains(8));
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector5() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .reduceGroup(new DummyGroupReduceFunction3())
+ .withForwardedFields("4->0;3;3->1;2");
+
+ SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+ assertTrue(semProps.getReadFields(0) == null);
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector6() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+ .reduceGroup(new DummyGroupReduceFunction3())
+ .withForwardedFields("4->0;3;3->1;2");
+
+ SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 7).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 7).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 8).contains(0));
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) == 8);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 7);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 7);
+ assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+ assertTrue(semProps.getReadFields(0) == null);
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector7() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .reduceGroup(new DummyGroupReduceFunction4());
+
+ SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).contains(0));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) == 2);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 2) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+ assertTrue(semProps.getReadFields(0) == null);
+ }
+
+ public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+ @Override
+ public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+ return new Tuple2<Long, Integer>();
+ }
+ }
+
+ @FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
+ @FunctionAnnotation.ReadFields("0;3;4")
+ public static class DummyGroupReduceFunction1 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+ }
+ }
+
+ @FunctionAnnotation.ReadFields("0;3;4")
+ public static class DummyGroupReduceFunction2 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+ }
+ }
+
+ public static class DummyGroupReduceFunction3 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+ }
+ }
+
+ @FunctionAnnotation.NonForwardedFields("2;4")
+ public static class DummyGroupReduceFunction4 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index 3d4551d..f1aadca 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -23,20 +23,29 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.JoinOperator;
+import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.hadoop.io.Writable;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
@SuppressWarnings("serial")
public class JoinOperatorTest {
@@ -298,7 +307,7 @@ public class JoinOperatorTest {
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
try {
TypeInformation<?> t = ds1.join(ds2).where("f0.myInt").equalTo(4).getType();
- Assert.assertTrue("not a composite type", t instanceof CompositeType);
+ assertTrue("not a composite type", t instanceof CompositeType);
} catch(Exception e) {
e.printStackTrace();
Assert.fail();
@@ -946,7 +955,136 @@ public class JoinOperatorTest {
.projectFirst(0)
.projectSecond(-1);
}
-
+
+ @Test
+ public void testSemanticPropsWithKeySelector1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ JoinOperator<?,?,?> joinOp = tupleDs1.join(tupleDs2)
+ .where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+ .with(new DummyTestJoinFunction1());
+
+ SemanticProperties semProps = joinOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0,2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0,3).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0,3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0,3).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0,4).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,6).size() == 0);
+
+ assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(1,4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(1,6).contains(0));
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(2));
+ assertTrue(semProps.getReadFields(0).contains(4));
+ assertTrue(semProps.getReadFields(0).contains(6));
+
+ assertTrue(semProps.getReadFields(1).size() == 2);
+ assertTrue(semProps.getReadFields(1).contains(3));
+ assertTrue(semProps.getReadFields(1).contains(5));
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ JoinOperator<?,?,?> joinOp = tupleDs1.join(tupleDs2)
+ .where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+ .with(new DummyTestJoinFunction2())
+ .withForwardedFieldsFirst("2;4->0")
+ .withForwardedFieldsSecond("0->4;1;1->3");
+
+ SemanticProperties semProps = joinOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0,0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0,4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0,6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0,6).contains(0));
+
+ assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(1,2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(1,3).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(1,3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(1,3).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(1,4).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1,6).size() == 0);
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(2));
+ assertTrue(semProps.getReadFields(0).contains(3));
+ assertTrue(semProps.getReadFields(0).contains(4));
+
+ assertTrue(semProps.getReadFields(1) == null);
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ JoinOperator<?, ?, ? extends Tuple> joinOp = tupleDs1.join(tupleDs2)
+ .where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+ .projectFirst(2)
+ .projectSecond(0, 0, 3)
+ .projectFirst(0, 4)
+ .projectSecond(2);
+
+ SemanticProperties semProps = joinOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(0));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).contains(5));
+
+ assertTrue(semProps.getForwardingTargetFields(1, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1, 2).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(1, 2).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(1, 2).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(1, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(1, 4).contains(6));
+ assertTrue(semProps.getForwardingTargetFields(1, 5).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(1, 5).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(1, 6).size() == 0);
+
+ }
+
/*
* ####################################################################
*/
@@ -1001,8 +1139,8 @@ public class JoinOperatorTest {
public NestedCustomType nested;
public String myString;
public Object nothing;
- // public List<String> countries; need Kryo to support this
- // public Writable interfaceTest; need kryo
+ public List<String> countries;
+ public Writable interfaceTest;
public CustomType() {};
@@ -1010,6 +1148,8 @@ public class JoinOperatorTest {
myInt = i;
myLong = l;
myString = s;
+ countries = null;
+ interfaceTest = null;
nested = new NestedCustomType(i, l, s);
}
@@ -1046,5 +1186,40 @@ public class JoinOperatorTest {
}
}
+ public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+ @Override
+ public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+ return new Tuple2<Long, Integer>();
+ }
+ }
+
+ @FunctionAnnotation.ForwardedFieldsFirst("0->4;1;1->3")
+ @FunctionAnnotation.ForwardedFieldsSecond("2;4->0")
+ @FunctionAnnotation.ReadFieldsFirst("0;2;4")
+ @FunctionAnnotation.ReadFieldsSecond("1;3")
+ public static class DummyTestJoinFunction1
+ implements JoinFunction<Tuple5<Integer, Long, String, Long, Integer>,
+ Tuple5<Integer, Long, String, Long, Integer>,
+ Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public Tuple5<Integer, Long, String, Long, Integer> join(
+ Tuple5<Integer, Long, String, Long, Integer> first,
+ Tuple5<Integer, Long, String, Long, Integer> second) throws Exception {
+ return new Tuple5<Integer, Long, String, Long, Integer>();
+ }
+ }
+
+ @FunctionAnnotation.ReadFieldsFirst("0;1;2")
+ public static class DummyTestJoinFunction2
+ implements JoinFunction<Tuple5<Integer, Long, String, Long, Integer>,
+ Tuple5<Integer, Long, String, Long, Integer>,
+ Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public Tuple5<Integer, Long, String, Long, Integer> join(
+ Tuple5<Integer, Long, String, Long, Integer> first,
+ Tuple5<Integer, Long, String, Long, Integer> second) throws Exception {
+ return new Tuple5<Integer, Long, String, Long, Integer>();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
new file mode 100644
index 0000000..dafc1f2
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.ReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("serial")
+public class ReduceOperatorTest {
+
+ private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+ new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+ TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ @Test
+ public void testSemanticPropsWithKeySelector1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ ReduceOperator<Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .reduce(new DummyReduceFunction1());
+
+ SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(2));
+ assertTrue(semProps.getReadFields(0).contains(5));
+ assertTrue(semProps.getReadFields(0).contains(6));
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ ReduceOperator<Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .reduce(new DummyReduceFunction2())
+ .withForwardedFields("0->4;1;1->3;2");
+
+ SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+ assertTrue(semProps.getReadFields(0).size() == 3);
+ assertTrue(semProps.getReadFields(0).contains(2));
+ assertTrue(semProps.getReadFields(0).contains(5));
+ assertTrue(semProps.getReadFields(0).contains(6));
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ ReduceOperator<Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .reduce(new DummyReduceFunction3())
+ .withForwardedFields("4->0;3;3->1;2");
+
+ SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+ assertTrue(semProps.getReadFields(0) == null);
+ }
+
+ @Test
+ public void testSemanticPropsWithKeySelector4() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ ReduceOperator<Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ tupleDs.groupBy(new DummyTestKeySelector())
+ .reduce(new DummyReduceFunction4());
+
+ SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).contains(0));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+ assertTrue(semProps.getForwardingSourceField(0, 0) == 2);
+ assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+ assertTrue(semProps.getForwardingSourceField(0, 2) < 0);
+ assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+ assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+ assertTrue(semProps.getReadFields(0) == null);
+ }
+
+ public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+ @Override
+ public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+ return new Tuple2<Long, Integer>();
+ }
+ }
+
+ @FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
+ @FunctionAnnotation.ReadFields("0;3;4")
+ public static class DummyReduceFunction1 implements ReduceFunction<Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public Tuple5<Integer, Long, String, Long, Integer> reduce(Tuple5<Integer, Long, String, Long, Integer> v1,
+ Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+ return new Tuple5<Integer, Long, String, Long, Integer>();
+ }
+ }
+
+ @FunctionAnnotation.ReadFields("0;3;4")
+ public static class DummyReduceFunction2 implements ReduceFunction<Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public Tuple5<Integer, Long, String, Long, Integer> reduce(Tuple5<Integer, Long, String, Long, Integer> v1,
+ Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+ return new Tuple5<Integer, Long, String, Long, Integer>();
+ }
+ }
+
+ public static class DummyReduceFunction3 implements ReduceFunction<Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public Tuple5<Integer, Long, String, Long, Integer> reduce(Tuple5<Integer, Long, String, Long, Integer> v1,
+ Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+ return new Tuple5<Integer, Long, String, Long, Integer>();
+ }
+ }
+
+ @FunctionAnnotation.NonForwardedFields("2;4")
+ public static class DummyReduceFunction4 implements ReduceFunction<Tuple5<Integer, Long, String, Long, Integer>> {
+ @Override
+ public Tuple5<Integer, Long, String, Long, Integer> reduce(Tuple5<Integer, Long, String, Long, Integer> v1,
+ Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+ return new Tuple5<Integer, Long, String, Long, Integer>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index c5067f9..9eb9a37 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -1005,6 +1005,64 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
"PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
}
+ @Test
+ public void testGroupReduceSelectorKeysWithSemProps() throws Exception {
+
+ /*
+ * Test that semantic properties are correctly adapted when using Selector Keys
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<Integer, Long>> reduceDs = ds
+ // group by selector key
+ .groupBy(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Long>() {
+ @Override
+ public Long getKey(Tuple5<Integer, Long, Integer, String, Long> v) throws Exception {
+ return (v.f0*v.f1)-(v.f2*v.f4);
+ }
+ })
+ .reduceGroup(
+ new GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>>() {
+ @Override
+ public void reduce(Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple5<Integer, Long, Integer, String, Long>> out) throws Exception {
+ for (Tuple5<Integer, Long, Integer, String, Long> v : values) {
+ out.collect(v);
+ }
+ }
+ })
+ // add forward field information
+ .withForwardedFields("0")
+ // group again and reduce
+ .groupBy(0).reduceGroup(
+ new GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() {
+ @Override
+ public void reduce(Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
+ int k = 0;
+ long s = 0;
+ for (Tuple5<Integer, Long, Integer, String, Long> v : values) {
+ k = v.f0;
+ s += v.f1;
+ }
+ out.collect(new Tuple2<Integer, Long>(k, s));
+ }
+ }
+ );
+
+ reduceDs.writeAsCsv(resultPath);
+
+ env.execute();
+
+ expected = "1,1\n" +
+ "2,5\n" +
+ "3,15\n" +
+ "4,34\n" +
+ "5,65\n";
+
+ }
+
public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
@Override
public void reduce(