You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/04/03 21:33:11 UTC

[2/5] flink git commit: [FLINK-1776] Add offsets to field indexes of semantic properties for operators with key selectors

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
index d686633..60754e6 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
@@ -21,6 +21,12 @@ package org.apache.flink.api.java.operator;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.operators.CoGroupOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -33,6 +39,8 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operator.JoinOperatorTest.CustomType;
 
+import static org.junit.Assert.assertTrue;
+
 @SuppressWarnings("serial")
 public class CoGroupOperatorTest {
 
@@ -348,4 +356,128 @@ public class CoGroupOperatorTest {
 					}
 				);
 	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		CoGroupOperator<?,?,?> coGroupOp = tupleDs1.coGroup(tupleDs2)
+				.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+				.with(new DummyTestCoGroupFunction1());
+
+		SemanticProperties semProps = coGroupOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0,2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0,3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0,3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0,3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0,4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,6).size() == 0);
+
+		assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1,4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1,6).contains(0));
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(4));
+		assertTrue(semProps.getReadFields(0).contains(6));
+
+		assertTrue(semProps.getReadFields(1).size() == 2);
+		assertTrue(semProps.getReadFields(1).contains(3));
+		assertTrue(semProps.getReadFields(1).contains(5));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		CoGroupOperator<?,?,?> coGroupOp = tupleDs1.coGroup(tupleDs2)
+				.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+				.with(new DummyTestCoGroupFunction2())
+				.withForwardedFieldsFirst("2;4->0")
+				.withForwardedFieldsSecond("0->4;1;1->3");
+
+		SemanticProperties semProps = coGroupOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0,0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0,4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0,6).contains(0));
+
+		assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1,2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(1,3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(1,3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(1,3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(1,4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,6).size() == 0);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(3));
+		assertTrue(semProps.getReadFields(0).contains(4));
+
+		assertTrue(semProps.getReadFields(1) == null);
+	}
+
+	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+		@Override
+		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+			return new Tuple2<Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ForwardedFieldsFirst("0->4;1;1->3")
+	@FunctionAnnotation.ForwardedFieldsSecond("2;4->0")
+	@FunctionAnnotation.ReadFieldsFirst("0;2;4")
+	@FunctionAnnotation.ReadFieldsSecond("1;3")
+	public static class DummyTestCoGroupFunction1
+			implements CoGroupFunction<Tuple5<Integer, Long, String, Long, Integer>,
+						Tuple5<Integer, Long, String, Long, Integer>,
+						Tuple5<Integer, Long, String, Long, Integer>> {
+
+		@Override
+		public void coGroup(Iterable<Tuple5<Integer, Long, String, Long, Integer>> first,
+							Iterable<Tuple5<Integer, Long, String, Long, Integer>> second,
+							Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	@FunctionAnnotation.ReadFieldsFirst("0;1;2")
+	public static class DummyTestCoGroupFunction2
+			implements CoGroupFunction<Tuple5<Integer, Long, String, Long, Integer>,
+			Tuple5<Integer, Long, String, Long, Integer>,
+			Tuple5<Integer, Long, String, Long, Integer>> {
+
+		@Override
+		public void coGroup(Iterable<Tuple5<Integer, Long, String, Long, Integer>> first,
+							Iterable<Tuple5<Integer, Long, String, Long, Integer>> second,
+							Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
new file mode 100644
index 0000000..4870d29
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupCombineOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("serial")
+public class GroupCombineOperatorTest {
+
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+	
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+	
+	@Test
+	public void testSemanticPropsWithKeySelector1() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.combineGroup(new DummyGroupCombineFunction1());
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(5));
+		assertTrue(semProps.getReadFields(0).contains(6));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+						.combineGroup(new DummyGroupCombineFunction1());
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(4));
+		assertTrue(semProps.getReadFields(0).contains(7));
+		assertTrue(semProps.getReadFields(0).contains(8));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.combineGroup(new DummyGroupCombineFunction2())
+							.withForwardedFields("0->4;1;1->3;2");
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(5));
+		assertTrue(semProps.getReadFields(0).contains(6));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+						.combineGroup(new DummyGroupCombineFunction2())
+							.withForwardedFields("0->4;1;1->3;2");
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(4));
+		assertTrue(semProps.getReadFields(0).contains(7));
+		assertTrue(semProps.getReadFields(0).contains(8));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector5() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.combineGroup(new DummyGroupCombineFunction3())
+						.withForwardedFields("4->0;3;3->1;2");
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector6() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+						.combineGroup(new DummyGroupCombineFunction3())
+						.withForwardedFields("4->0;3;3->1;2");
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 7).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 8).contains(0));
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 8);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 7);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 7);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector7() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.combineGroup(new DummyGroupCombineFunction4());
+
+		SemanticProperties semProps = combineOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 2);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+		@Override
+		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+			return new Tuple2<Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
+	@FunctionAnnotation.ReadFields("0;3;4")
+	public static class DummyGroupCombineFunction1 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	@FunctionAnnotation.ReadFields("0;3;4")
+	public static class DummyGroupCombineFunction2 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	public static class DummyGroupCombineFunction3 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	@FunctionAnnotation.NonForwardedFields("2;4")
+	public static class DummyGroupCombineFunction4 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
new file mode 100644
index 0000000..0bfe566
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("serial")
+public class GroupReduceOperatorTest {
+
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+	
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+	
+	@Test
+	public void testSemanticPropsWithKeySelector1() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduceGroup(new DummyGroupReduceFunction1());
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(5));
+		assertTrue(semProps.getReadFields(0).contains(6));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+						.reduceGroup(new DummyGroupReduceFunction1());
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(4));
+		assertTrue(semProps.getReadFields(0).contains(7));
+		assertTrue(semProps.getReadFields(0).contains(8));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduceGroup(new DummyGroupReduceFunction2())
+							.withForwardedFields("0->4;1;1->3;2");
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(5));
+		assertTrue(semProps.getReadFields(0).contains(6));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+						.reduceGroup(new DummyGroupReduceFunction2())
+							.withForwardedFields("0->4;1;1->3;2");
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 4);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(4));
+		assertTrue(semProps.getReadFields(0).contains(7));
+		assertTrue(semProps.getReadFields(0).contains(8));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector5() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduceGroup(new DummyGroupReduceFunction3())
+						.withForwardedFields("4->0;3;3->1;2");
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector6() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
+						.reduceGroup(new DummyGroupReduceFunction3())
+						.withForwardedFields("4->0;3;3->1;2");
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 7).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 7).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 8).contains(0));
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 8);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 7);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 7);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector7() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduceGroup(new DummyGroupReduceFunction4());
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 2);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+		@Override
+		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+			return new Tuple2<Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
+	@FunctionAnnotation.ReadFields("0;3;4")
+	public static class DummyGroupReduceFunction1 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	@FunctionAnnotation.ReadFields("0;3;4")
+	public static class DummyGroupReduceFunction2 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	public static class DummyGroupReduceFunction3 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+
+	@FunctionAnnotation.NonForwardedFields("2;4")
+	public static class DummyGroupReduceFunction4 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index 3d4551d..f1aadca 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -23,20 +23,29 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.JoinOperator;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.hadoop.io.Writable;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+
 @SuppressWarnings("serial")
 public class JoinOperatorTest {
 
@@ -298,7 +307,7 @@ public class JoinOperatorTest {
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
 		try {
 			TypeInformation<?> t = ds1.join(ds2).where("f0.myInt").equalTo(4).getType();
-			Assert.assertTrue("not a composite type", t instanceof CompositeType);
+			assertTrue("not a composite type", t instanceof CompositeType);
 		} catch(Exception e) {
 			e.printStackTrace();
 			Assert.fail();
@@ -946,7 +955,136 @@ public class JoinOperatorTest {
 		.projectFirst(0)
 		.projectSecond(-1);
 	}
-	
+
+	@Test
+	public void testSemanticPropsWithKeySelector1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		JoinOperator<?,?,?> joinOp = tupleDs1.join(tupleDs2)
+				.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+				.with(new DummyTestJoinFunction1());
+
+		SemanticProperties semProps = joinOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0,2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0,3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0,3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0,3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0,4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,6).size() == 0);
+
+		assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1,4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1,6).contains(0));
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(4));
+		assertTrue(semProps.getReadFields(0).contains(6));
+
+		assertTrue(semProps.getReadFields(1).size() == 2);
+		assertTrue(semProps.getReadFields(1).contains(3));
+		assertTrue(semProps.getReadFields(1).contains(5));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		JoinOperator<?,?,?> joinOp = tupleDs1.join(tupleDs2)
+				.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+				.with(new DummyTestJoinFunction2())
+					.withForwardedFieldsFirst("2;4->0")
+					.withForwardedFieldsSecond("0->4;1;1->3");
+
+		SemanticProperties semProps = joinOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0,0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0,4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0,6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0,6).contains(0));
+
+		assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1,2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(1,3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(1,3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(1,3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(1,4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1,6).size() == 0);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(3));
+		assertTrue(semProps.getReadFields(0).contains(4));
+
+		assertTrue(semProps.getReadFields(1) == null);
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		JoinOperator<?, ?, ? extends Tuple> joinOp = tupleDs1.join(tupleDs2)
+				.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
+				.projectFirst(2)
+				.projectSecond(0, 0, 3)
+				.projectFirst(0, 4)
+				.projectSecond(2);
+
+		SemanticProperties semProps = joinOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(0));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(5));
+
+		assertTrue(semProps.getForwardingTargetFields(1, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 2).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(1, 2).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(1, 2).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(1, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(1, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1, 4).contains(6));
+		assertTrue(semProps.getForwardingTargetFields(1, 5).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(1, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(1, 6).size() == 0);
+
+	}
+
 	/*
 	 * ####################################################################
 	 */
@@ -1001,8 +1139,8 @@ public class JoinOperatorTest {
 		public NestedCustomType nested;
 		public String myString;
 		public Object nothing;
-	//	public List<String> countries; need Kryo to support this
-	//	public Writable interfaceTest; need kryo
+		public List<String> countries;
+		public Writable interfaceTest;
 		
 		public CustomType() {};
 		
@@ -1010,6 +1148,8 @@ public class JoinOperatorTest {
 			myInt = i;
 			myLong = l;
 			myString = s;
+			countries = null;
+			interfaceTest = null;
 			nested = new NestedCustomType(i, l, s);
 		}
 		
@@ -1046,5 +1186,40 @@ public class JoinOperatorTest {
 		}
 	}
 
+	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+		@Override
+		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+			return new Tuple2<Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ForwardedFieldsFirst("0->4;1;1->3")
+	@FunctionAnnotation.ForwardedFieldsSecond("2;4->0")
+	@FunctionAnnotation.ReadFieldsFirst("0;2;4")
+	@FunctionAnnotation.ReadFieldsSecond("1;3")
+	public static class DummyTestJoinFunction1
+			implements JoinFunction<Tuple5<Integer, Long, String, Long, Integer>,
+									Tuple5<Integer, Long, String, Long, Integer>,
+									Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public Tuple5<Integer, Long, String, Long, Integer> join(
+				Tuple5<Integer, Long, String, Long, Integer> first,
+				Tuple5<Integer, Long, String, Long, Integer> second) throws Exception {
+			return new Tuple5<Integer, Long, String, Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ReadFieldsFirst("0;1;2")
+	public static class DummyTestJoinFunction2
+			implements JoinFunction<Tuple5<Integer, Long, String, Long, Integer>,
+			Tuple5<Integer, Long, String, Long, Integer>,
+			Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public Tuple5<Integer, Long, String, Long, Integer> join(
+				Tuple5<Integer, Long, String, Long, Integer> first,
+				Tuple5<Integer, Long, String, Long, Integer> second) throws Exception {
+			return new Tuple5<Integer, Long, String, Long, Integer>();
+		}
+	}
 	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
new file mode 100644
index 0000000..dafc1f2
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.ReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("serial")
+public class ReduceOperatorTest {
+
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+	
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+	
+	@Test
+	public void testSemanticPropsWithKeySelector1() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		ReduceOperator<Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduce(new DummyReduceFunction1());
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(5));
+		assertTrue(semProps.getReadFields(0).contains(6));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		ReduceOperator<Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduce(new DummyReduceFunction2())
+							.withForwardedFields("0->4;1;1->3;2");
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 4) == 2);
+
+		assertTrue(semProps.getReadFields(0).size() == 3);
+		assertTrue(semProps.getReadFields(0).contains(2));
+		assertTrue(semProps.getReadFields(0).contains(5));
+		assertTrue(semProps.getReadFields(0).contains(6));
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		ReduceOperator<Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduce(new DummyReduceFunction3())
+						.withForwardedFields("4->0;3;3->1;2");
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	@Test
+	public void testSemanticPropsWithKeySelector4() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		ReduceOperator<Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+				tupleDs.groupBy(new DummyTestKeySelector())
+						.reduce(new DummyReduceFunction4());
+
+		SemanticProperties semProps = reduceOp.getSemanticProperties();
+
+		assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 2).contains(0));
+		assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+		assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 1);
+		assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
+		assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+		assertTrue(semProps.getForwardingSourceField(0, 0) == 2);
+		assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
+		assertTrue(semProps.getForwardingSourceField(0, 2) < 0);
+		assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
+		assertTrue(semProps.getForwardingSourceField(0, 4) < 0);
+
+		assertTrue(semProps.getReadFields(0) == null);
+	}
+
+	public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+		@Override
+		public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
+			return new Tuple2<Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
+	@FunctionAnnotation.ReadFields("0;3;4")
+	public static class DummyReduceFunction1 implements ReduceFunction<Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public Tuple5<Integer, Long, String, Long, Integer> reduce(Tuple5<Integer, Long, String, Long, Integer> v1,
+																	Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+			return new Tuple5<Integer, Long, String, Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.ReadFields("0;3;4")
+	public static class DummyReduceFunction2 implements ReduceFunction<Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public Tuple5<Integer, Long, String, Long, Integer> reduce(Tuple5<Integer, Long, String, Long, Integer> v1,
+																   Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+			return new Tuple5<Integer, Long, String, Long, Integer>();
+		}
+	}
+
+	public static class DummyReduceFunction3 implements ReduceFunction<Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public Tuple5<Integer, Long, String, Long, Integer> reduce(Tuple5<Integer, Long, String, Long, Integer> v1,
+																   Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+			return new Tuple5<Integer, Long, String, Long, Integer>();
+		}
+	}
+
+	@FunctionAnnotation.NonForwardedFields("2;4")
+	public static class DummyReduceFunction4 implements ReduceFunction<Tuple5<Integer, Long, String, Long, Integer>> {
+		@Override
+		public Tuple5<Integer, Long, String, Long, Integer> reduce(Tuple5<Integer, Long, String, Long, Integer> v1,
+																   Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+			return new Tuple5<Integer, Long, String, Long, Integer>();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index c5067f9..9eb9a37 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -1005,6 +1005,64 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 				"PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
 	}
 
+	@Test
+	public void testGroupReduceSelectorKeysWithSemProps() throws Exception {
+
+		/*
+		 * Test that semantic properties are correctly adapted when using Selector Keys
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple2<Integer, Long>> reduceDs = ds
+				// group by selector key
+				.groupBy(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Long>() {
+					@Override
+					public Long getKey(Tuple5<Integer, Long, Integer, String, Long> v) throws Exception {
+						return (v.f0*v.f1)-(v.f2*v.f4);
+					}
+				})
+				.reduceGroup(
+						new GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>>() {
+							@Override
+							public void reduce(Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple5<Integer, Long, Integer, String, Long>> out) throws Exception {
+								for (Tuple5<Integer, Long, Integer, String, Long> v : values) {
+									out.collect(v);
+								}
+							}
+						})
+				// add forward field information
+				.withForwardedFields("0")
+				// group again and reduce
+				.groupBy(0).reduceGroup(
+						new GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() {
+							@Override
+							public void reduce(Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
+								int k = 0;
+								long s = 0;
+								for (Tuple5<Integer, Long, Integer, String, Long> v : values) {
+									k = v.f0;
+									s += v.f1;
+								}
+								out.collect(new Tuple2<Integer, Long>(k, s));
+							}
+						}
+				);
+
+		reduceDs.writeAsCsv(resultPath);
+
+		env.execute();
+
+		expected = "1,1\n" +
+				"2,5\n" +
+				"3,15\n" +
+				"4,34\n" +
+				"5,65\n";
+
+	}
+
 	public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
 		@Override
 		public void reduce(